qrlew/differential_privacy/
mod.rs

1//! # Methods to transform `Relation`s into differentially private ones
2//!
3//! This is experimental and little tested yet.
4//!
5
6pub mod aggregates;
7pub mod dp_event;
8pub mod dp_parameters;
9pub mod group_by;
10
11use crate::{
12    builder::With,
13    expr,
14    privacy_unit_tracking::{self, PupRelation},
15    relation::{rewriting, Reduce, Relation, Variant},
16    Ready,
17};
18use std::{error, fmt, ops::Deref, result};
19
20pub use dp_event::DpEvent;
21/// Some exports
22pub use dp_parameters::DpParameters;
23
24use self::aggregates::DpAggregatesParameters;
25
26#[derive(Debug, PartialEq, Clone)]
27pub enum Error {
28    InvalidRelation(String),
29    DPCompilationError(String),
30    GroupingKeysError(String),
31    BudgetError(String),
32    Other(String),
33}
34
35impl Error {
36    pub fn invalid_relation(relation: impl fmt::Display) -> Error {
37        Error::InvalidRelation(format!("{relation} is invalid"))
38    }
39}
40
41impl fmt::Display for Error {
42    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
43        match self {
44            Error::InvalidRelation(relation) => writeln!(f, "{relation} invalid."),
45            Error::DPCompilationError(desc) => writeln!(f, "DPCompilationError: {}", desc),
46            Error::GroupingKeysError(desc) => writeln!(f, "GroupingKeysError: {}", desc),
47            Error::BudgetError(desc) => writeln!(f, "BudgetError: {}", desc),
48            Error::Other(err) => writeln!(f, "{}", err),
49        }
50    }
51}
52
53impl From<expr::Error> for Error {
54    fn from(err: expr::Error) -> Self {
55        Error::Other(err.to_string())
56    }
57}
58impl From<rewriting::Error> for Error {
59    fn from(err: rewriting::Error) -> Self {
60        Error::Other(err.to_string())
61    }
62}
63impl From<privacy_unit_tracking::Error> for Error {
64    fn from(err: privacy_unit_tracking::Error) -> Self {
65        Error::Other(err.to_string())
66    }
67}
68
69impl error::Error for Error {}
70pub type Result<T> = result::Result<T, Error>;
71
72/// A DP Relation
73#[derive(Clone, Debug)]
74pub struct DpRelation {
75    relation: Relation,
76    dp_event: DpEvent,
77}
78
79impl From<DpRelation> for Relation {
80    fn from(value: DpRelation) -> Self {
81        value.relation
82    }
83}
84
85impl DpRelation {
86    pub fn new(relation: Relation, dp_event: DpEvent) -> Self {
87        DpRelation { relation, dp_event }
88    }
89
90    pub fn relation(&self) -> &Relation {
91        &self.relation
92    }
93
94    pub fn dp_event(&self) -> &DpEvent {
95        &self.dp_event
96    }
97}
98
99impl Deref for DpRelation {
100    type Target = Relation;
101
102    fn deref(&self) -> &Self::Target {
103        &self.relation
104    }
105}
106
107impl From<DpRelation> for (Relation, DpEvent) {
108    fn from(value: DpRelation) -> Self {
109        (value.relation, value.dp_event)
110    }
111}
112
113impl From<(Relation, DpEvent)> for DpRelation {
114    fn from(value: (Relation, DpEvent)) -> Self {
115        DpRelation::new(value.0, value.1)
116    }
117}
118
119impl Reduce {
120    /// Rewrite a `Reduce` into DP:
121    ///     - Protect the grouping keys
122    ///     - Add noise on the aggregations
123    pub fn differentially_private(self, parameters: &DpParameters) -> Result<DpRelation> {
124        let mut dp_event = DpEvent::no_op();
125        let max_size = self.size().max().unwrap().clone();
126        let pup_input = PupRelation::try_from(self.input().clone())?;
127        let privacy_unit_unique =
128            pup_input.schema()[pup_input.privacy_unit()].has_unique_or_primary_key_constraint();
129
130        // DP rewrite group by
131        let reduce_with_dp_group_by = if self.group_by().is_empty() {
132            self
133        } else {
134            let (dp_grouping_values, dp_event_group_by) = self
135                .differentially_private_group_by(
136                    parameters.epsilon * parameters.tau_thresholding_share,
137                    parameters.delta * parameters.tau_thresholding_share,
138                    parameters.max_privacy_unit_groups,
139                )?
140                .into();
141            let input_relation_with_privacy_tracked_group_by = self
142                .input()
143                .clone()
144                .join_with_grouping_values(dp_grouping_values)?;
145            let reduce: Reduce = Reduce::builder()
146                .with(self)
147                .input(input_relation_with_privacy_tracked_group_by)
148                .build();
149            dp_event = dp_event.compose(dp_event_group_by);
150            reduce
151        };
152
153        // if the (epsilon_tau_thresholding, delta_tau_thresholding) budget has
154        // not been spent, allocate it to the aggregations.
155        let aggregation_share = if dp_event.is_no_op() {
156            1.
157        } else {
158            1. - parameters.tau_thresholding_share
159        };
160        let aggregation_parameters =
161            DpAggregatesParameters::from_dp_parameters(parameters.clone(), aggregation_share)
162                .with_size(usize::try_from(max_size).unwrap())
163                .with_privacy_unit_unique(privacy_unit_unique);
164
165        // DP rewrite aggregates
166        let (dp_relation, dp_event_agg) = reduce_with_dp_group_by
167            .differentially_private_aggregates(aggregation_parameters)?
168            .into();
169        dp_event = dp_event.compose(dp_event_agg);
170        Ok((dp_relation, dp_event).into())
171    }
172}
173
174#[cfg(test)]
175mod tests {
176    use super::*;
177    use crate::{
178        ast,
179        builder::With,
180        data_type::{DataType, DataTyped, Variant as _},
181        display::Dot,
182        expr::{AggregateColumn, Expr},
183        io::{postgresql, Database},
184        privacy_unit_tracking::{PrivacyUnit, PrivacyUnitTracking, PupRelation, Strategy},
185        relation::{Constraint, Field, Map, Relation, Schema, Variant},
186    };
187    use std::{collections::HashSet, sync::Arc};
188
189    #[test]
190    fn test_dp_rewrite_reduce_without_group_by() {
191        let mut database = postgresql::test_database();
192        let relations = database.relations();
193        let parameters = DpParameters::from_epsilon_delta(1., 1e-3);
194
195        // privacy track the inputs
196        let table = relations
197            .get(&["item_table".to_string()])
198            .unwrap()
199            .deref()
200            .clone();
201        let privacy_unit_tracking = PrivacyUnitTracking::from((
202            &relations,
203            vec![
204                (
205                    "item_table",
206                    vec![("order_id", "order_table", "id")],
207                    "date",
208                ),
209                ("order_table", vec![], "date"),
210            ],
211            Strategy::Hard,
212        ));
213        let pup_table = privacy_unit_tracking
214            .table(&table.clone().try_into().unwrap())
215            .unwrap();
216        let reduce = Reduce::new(
217            "my_reduce".to_string(),
218            vec![("sum_price".to_string(), AggregateColumn::sum("price"))],
219            vec![],
220            pup_table.deref().clone().into(),
221        );
222        let relation = Relation::from(reduce.clone());
223        relation.display_dot().unwrap();
224
225        let (dp_relation, dp_event) = reduce.differentially_private(&parameters).unwrap().into();
226        dp_relation.display_dot().unwrap();
227        let _mult: f64 = 2000.
228            * DpAggregatesParameters::from_dp_parameters(parameters.clone(), 1.)
229                .privacy_unit_multiplicity();
230        assert!(matches!(
231            dp_event,
232            DpEvent::Gaussian {
233                noise_multiplier: _
234            }
235        ));
236        assert!(dp_relation
237            .data_type()
238            .is_subset_of(&DataType::structured([("sum_price", DataType::float())])));
239
240        let query: &str = &ast::Query::from(&dp_relation).to_string();
241        _ = database.query(query).unwrap();
242
243        // input a map
244        let table = relations
245            .get(&["table_1".to_string()])
246            .unwrap()
247            .deref()
248            .clone();
249        let privacy_unit_tracking = PrivacyUnitTracking::from((
250            &relations,
251            vec![("table_1", vec![], PrivacyUnit::privacy_unit_row())],
252            Strategy::Hard,
253        ));
254        let pup_table = privacy_unit_tracking
255            .table(&table.clone().try_into().unwrap())
256            .unwrap();
257        let map = Map::new(
258            "my_map".to_string(),
259            vec![("my_d".to_string(), expr!(d / 100))],
260            None,
261            vec![],
262            None,
263            None,
264            Arc::new(table.into()),
265        );
266        let pup_map = privacy_unit_tracking
267            .map(
268                &map.clone().try_into().unwrap(),
269                PupRelation(Relation::from(pup_table)),
270            )
271            .unwrap();
272        let reduce = Reduce::new(
273            "my_reduce".to_string(),
274            vec![("sum_d".to_string(), AggregateColumn::sum("my_d"))],
275            vec![],
276            pup_map.deref().clone().into(),
277        );
278        let relation = Relation::from(reduce.clone());
279        relation.display_dot().unwrap();
280
281        let (dp_relation, dp_event) = reduce.differentially_private(&parameters).unwrap().into();
282        dp_relation.display_dot().unwrap();
283        assert!(dp_event.is_no_op()); // private query is null beacause we have computed the sum of zeros
284        assert_eq!(
285            dp_relation.data_type(),
286            DataType::structured([("sum_d", DataType::float_value(0.))])
287        );
288
289        let query: &str = &ast::Query::from(&dp_relation).to_string();
290        _ = database.query(query).unwrap();
291    }
292
293    #[test]
294    fn test_dp_rewrite_reduce_group_by_possible_values() {
295        let mut database = postgresql::test_database();
296        let relations = database.relations();
297        let table = relations
298            .get(&["item_table".to_string()])
299            .unwrap()
300            .deref()
301            .clone();
302        let parameters = DpParameters::from_epsilon_delta(1., 1e-3);
303
304        // privacy track the inputs
305        let privacy_unit_tracking = PrivacyUnitTracking::from((
306            &relations,
307            vec![
308                (
309                    "item_table",
310                    vec![("order_id", "order_table", "id")],
311                    "date",
312                ),
313                ("order_table", vec![], "date"),
314            ],
315            Strategy::Hard,
316        ));
317        let pup_table = privacy_unit_tracking
318            .table(&table.try_into().unwrap())
319            .unwrap();
320        let map: Map = Relation::map()
321            .with(("order_id", expr!(order_id)))
322            .with(("price", expr!(price)))
323            .filter(Expr::in_list(
324                Expr::col("order_id"),
325                Expr::list(vec![1, 2, 3, 4, 5]),
326            ))
327            .input(pup_table.deref().clone())
328            .build();
329        let pup_map = privacy_unit_tracking
330            .map(&map.try_into().unwrap(), pup_table)
331            .unwrap();
332
333        let reduce = Reduce::new(
334            "my_reduce".to_string(),
335            vec![("sum_price".to_string(), AggregateColumn::sum("price"))],
336            vec!["order_id".into()],
337            pup_map.deref().clone().into(),
338        );
339        let relation = Relation::from(reduce.clone());
340        relation.display_dot().unwrap();
341
342        let (dp_relation, dp_event) = reduce.differentially_private(&parameters).unwrap().into();
343        dp_relation.display_dot().unwrap();
344        assert!(matches!(
345            dp_event,
346            DpEvent::Gaussian {
347                noise_multiplier: _
348            }
349        ));
350        assert!(dp_relation
351            .data_type()
352            .is_subset_of(&DataType::structured([("sum_price", DataType::float())])));
353
354        let query: &str = &ast::Query::from(&dp_relation).to_string();
355        println!("{query}");
356        _ = database.query(query).unwrap();
357    }
358
359    #[test]
360    fn test_dp_rewrite_reduce_group_by_tau_thresholding() {
361        let mut database = postgresql::test_database();
362        let relations = database.relations();
363        let table = relations
364            .get(&["item_table".to_string()])
365            .unwrap()
366            .deref()
367            .clone();
368        let parameters =
369            DpParameters::from_epsilon_delta(100., 1e-3).with_max_privacy_unit_groups(10);
370
371        // privacy track the inputs
372        let privacy_unit_tracking = PrivacyUnitTracking::from((
373            &relations,
374            vec![
375                (
376                    "item_table",
377                    vec![("order_id", "order_table", "id")],
378                    "date",
379                ),
380                ("order_table", vec![], "date"),
381            ],
382            Strategy::Hard,
383        ));
384        let pup_table = privacy_unit_tracking
385            .table(&table.try_into().unwrap())
386            .unwrap();
387        let map: Map = Relation::map()
388            .with(("order_id", expr!(order_id)))
389            .with(("price", expr!(price)))
390            .input(pup_table.deref().clone())
391            .build();
392        let pup_map = privacy_unit_tracking
393            .map(&map.try_into().unwrap(), pup_table)
394            .unwrap();
395
396        let reduce = Reduce::new(
397            "my_reduce".to_string(),
398            vec![("sum_price".to_string(), AggregateColumn::sum("price"))],
399            vec!["order_id".into()],
400            pup_map.deref().clone().into(),
401        );
402        let relation = Relation::from(reduce.clone());
403        relation.display_dot().unwrap();
404
405        let (dp_relation, dp_event) = reduce.differentially_private(&parameters).unwrap().into();
406        dp_relation.display_dot().unwrap();
407        assert!(matches!(dp_event, DpEvent::Composed { events: _ }));
408        assert!(dp_relation
409            .data_type()
410            .is_subset_of(&DataType::structured([("sum_price", DataType::float())])));
411
412        let query: &str = &ast::Query::from(&dp_relation).to_string();
413        println!("{query}");
414        let res = database.query(query).unwrap();
415        println!("\n{:?}", res);
416    }
417
418    #[test]
419    fn test_dp_rewrite_reduce_group_by_possible_both() {
420        let mut database = postgresql::test_database();
421        let relations = database.relations();
422        let table = relations
423            .get(&["item_table".to_string()])
424            .unwrap()
425            .deref()
426            .clone();
427        let parameters = DpParameters::from_epsilon_delta(1., 1e-3);
428
429        // privacy track the inputs
430        let privacy_unit_tracking = PrivacyUnitTracking::from((
431            &relations,
432            vec![
433                (
434                    "item_table",
435                    vec![("order_id", "order_table", "id")],
436                    "date",
437                ),
438                ("order_table", vec![], "date"),
439            ],
440            Strategy::Hard,
441        ));
442        let pup_table = privacy_unit_tracking
443            .table(&table.try_into().unwrap())
444            .unwrap();
445        let map: Map = Relation::map()
446            .with(("order_id", expr!(order_id)))
447            .with(("item", expr!(item)))
448            .with(("price", expr!(price)))
449            .filter(Expr::in_list(
450                Expr::col("order_id"),
451                Expr::list(vec![1, 2, 3, 4, 5]),
452            ))
453            .input(pup_table.deref().clone())
454            .build();
455        let pup_map = privacy_unit_tracking
456            .map(&map.try_into().unwrap(), pup_table)
457            .unwrap();
458
459        let reduce = Reduce::new(
460            "my_reduce".to_string(),
461            vec![
462                ("item".to_string(), AggregateColumn::first("item")),
463                ("order_id".to_string(), AggregateColumn::first("order_id")),
464                ("sum_price".to_string(), AggregateColumn::sum("price")),
465            ],
466            vec!["order_id".into(), "item".into()],
467            pup_map.deref().clone().into(),
468        );
469        let relation = Relation::from(reduce.clone());
470        relation.display_dot().unwrap();
471
472        let (dp_relation, dp_event) = reduce.differentially_private(&parameters).unwrap().into();
473        dp_relation.display_dot().unwrap();
474        assert!(matches!(dp_event, DpEvent::Composed { events: _ }));
475        assert!(dp_relation.schema()[0]
476            .data_type()
477            .is_subset_of(&DataType::text()));
478        assert_eq!(
479            dp_relation.schema()[1].data_type(),
480            DataType::integer_values(vec![1, 2, 3, 4, 5])
481        );
482        assert!(dp_relation.schema()[2]
483            .data_type()
484            .is_subset_of(&DataType::float()));
485
486        let query: &str = &ast::Query::from(&dp_relation).to_string();
487        println!("{query}");
488        _ = database.query(query).unwrap();
489    }
490
491    #[test]
492    fn test_differentially_private_output_all_grouping_keys_simple() {
493        // test the results contains all the possible keys
494        let mut database = postgresql::test_database();
495        let relations = database.relations();
496        let table = relations
497            .get(&["large_user_table".into()])
498            .unwrap()
499            .as_ref()
500            .clone();
501        let new_schema: Schema = table
502            .schema()
503            .iter()
504            .map(|f| {
505                if f.name() == "city" {
506                    Field::from_name_data_type("city", DataType::text())
507                } else {
508                    f.clone()
509                }
510            })
511            .collect();
512        let table: Relation = Relation::table()
513            .path(["large_user_table"])
514            .name("more_users")
515            .size(100000)
516            .schema(new_schema)
517            .build();
518        let input: Relation = Relation::map()
519            .name("map_relation")
520            .with(("income", expr!(income)))
521            .with(("city", expr!(city)))
522            .with((PrivacyUnit::privacy_unit(), expr!(id)))
523            .with((PrivacyUnit::privacy_unit_weight(), expr!(id)))
524            .filter(Expr::in_list(
525                Expr::col("city"),
526                Expr::list(vec!["Paris".to_string(), "London".to_string()]),
527            ))
528            .input(table.clone())
529            .build();
530        let reduce: Reduce = Relation::reduce()
531            .name("reduce_relation")
532            .with(("city".to_string(), AggregateColumn::first("city")))
533            .with(("count_income".to_string(), AggregateColumn::count("income")))
534            .group_by(expr!(city))
535            .input(input)
536            .build();
537        let (dp_relation, dp_event) = reduce
538            .differentially_private(&DpParameters::from_epsilon_delta(10., 1e-5))
539            .unwrap()
540            .into();
541        println!("{}", dp_event);
542        dp_relation.display_dot().unwrap();
543        let query: &str = &ast::Query::from(&dp_relation).to_string();
544        let results = database.query(query).unwrap();
545        println!("results = {:?}", results);
546        let city_keys: HashSet<_> = results
547            .iter()
548            .map(|row| row.to_vec().clone()[0].clone().to_string())
549            .collect();
550        let correct_keys: HashSet<_> = vec!["London".to_string(), "Paris".to_string()]
551            .into_iter()
552            .collect();
553        assert_eq!(city_keys, correct_keys);
554    }
555
556    #[test]
557    fn test_differentially_private_output_all_grouping_keys() {
558        // test the results contains all the possible keys
559        let mut database = postgresql::test_database();
560        let relations = database.relations();
561        let table = relations
562            .get(&["large_user_table".into()])
563            .unwrap()
564            .as_ref()
565            .clone();
566        let new_schema: Schema = table
567            .schema()
568            .iter()
569            .map(|f| {
570                if f.name() == "city" {
571                    Field::from_name_data_type("city", DataType::text())
572                } else {
573                    f.clone()
574                }
575            })
576            .collect();
577        let table: Relation = Relation::table()
578            .path(["large_user_table"])
579            .name("more_users")
580            .size(100000)
581            .schema(new_schema)
582            .build();
583        let input: Relation = Relation::map()
584            .name("map_relation")
585            .with(("income", expr!(income)))
586            .with(("city", expr!(city)))
587            .with(("age", expr!(age)))
588            .with((PrivacyUnit::privacy_unit(), expr!(id)))
589            .with((PrivacyUnit::privacy_unit_weight(), expr!(id)))
590            .filter(Expr::in_list(
591                Expr::col("city"),
592                Expr::list(vec!["Paris".to_string(), "London".to_string()]),
593            ))
594            .input(table.clone())
595            .build();
596        let reduce: Reduce = Relation::reduce()
597            .name("reduce_relation")
598            .with(("city".to_string(), AggregateColumn::first("city")))
599            .with(("age".to_string(), AggregateColumn::first("age")))
600            .with(("sum_income".to_string(), AggregateColumn::sum("income")))
601            .group_by(expr!(city))
602            .group_by(expr!(age))
603            .input(input)
604            .build();
605        let (dp_relation, dp_event) = reduce
606            .differentially_private(&DpParameters::from_epsilon_delta(10., 1e-5))
607            .unwrap()
608            .into();
609        println!("{}", dp_event);
610        dp_relation.display_dot().unwrap();
611        let query: &str = &ast::Query::from(&dp_relation).to_string();
612        let results = database.query(query).unwrap();
613        println!("{:?}", results);
614        let city_keys: HashSet<_> = results
615            .iter()
616            .map(|row| row.to_vec().clone()[0].clone().to_string())
617            .collect();
618        let correct_keys: HashSet<_> = vec!["London".to_string(), "Paris".to_string()]
619            .into_iter()
620            .collect();
621        assert_eq!(city_keys, correct_keys);
622    }
623
624    #[test]
625    fn test_dp_rewrite_reduce() {
626        let mut database = postgresql::test_database();
627        let relations = database.relations();
628        let table = relations
629            .get(&["table_1".to_string()])
630            .unwrap()
631            .deref()
632            .clone();
633        let parameters = DpParameters::from_epsilon_delta(1., 1e-3);
634
635        // privacy track the inputs
636        let privacy_unit_tracking = PrivacyUnitTracking::from((
637            &relations,
638            vec![("table_1", vec![], PrivacyUnit::privacy_unit_row())],
639            Strategy::Hard,
640        ));
641        let pup_table = privacy_unit_tracking
642            .table(&table.try_into().unwrap())
643            .unwrap();
644        let reduce = Reduce::new(
645            "my_reduce".to_string(),
646            vec![
647                ("sum_a".to_string(), AggregateColumn::sum("a")),
648                ("d".to_string(), AggregateColumn::first("d")),
649                ("max_d".to_string(), AggregateColumn::max("d")),
650            ],
651            vec!["d".into()],
652            pup_table.deref().clone().into(),
653        );
654        let relation = Relation::from(reduce.clone());
655        relation.display_dot().unwrap();
656
657        let (dp_relation, dp_event) = reduce.differentially_private(&parameters).unwrap().into();
658        dp_relation.display_dot().unwrap();
659        assert_eq!(
660            dp_event,
661            DpEvent::epsilon_delta(
662                parameters.epsilon * parameters.tau_thresholding_share,
663                parameters.delta * parameters.tau_thresholding_share
664            )
665            .compose(DpEvent::gaussian_from_epsilon_delta(
666                parameters.epsilon * (1. - parameters.tau_thresholding_share),
667                parameters.delta * (1. - parameters.tau_thresholding_share),
668            ))
669        );
670        let correct_schema: Schema = vec![
671            ("sum_a", DataType::float_interval(0., 100.), None),
672            (
673                "d",
674                DataType::integer_interval(0, 10),
675                Some(Constraint::Unique),
676            ),
677            (
678                "max_d",
679                DataType::integer_interval(0, 10),
680                Some(Constraint::Unique),
681            ),
682        ]
683        .into_iter()
684        .collect();
685        assert_eq!(dp_relation.schema(), &correct_schema);
686
687        let query: &str = &ast::Query::from(&dp_relation).to_string();
688        println!("{query}");
689        _ = database.query(query).unwrap();
690    }
691}