1pub mod aggregates;
7pub mod dp_event;
8pub mod dp_parameters;
9pub mod group_by;
10
11use crate::{
12 builder::With,
13 expr,
14 privacy_unit_tracking::{self, PupRelation},
15 relation::{rewriting, Reduce, Relation, Variant},
16 Ready,
17};
18use std::{error, fmt, ops::Deref, result};
19
20pub use dp_event::DpEvent;
21pub use dp_parameters::DpParameters;
23
24use self::aggregates::DpAggregatesParameters;
25
26#[derive(Debug, PartialEq, Clone)]
27pub enum Error {
28 InvalidRelation(String),
29 DPCompilationError(String),
30 GroupingKeysError(String),
31 BudgetError(String),
32 Other(String),
33}
34
35impl Error {
36 pub fn invalid_relation(relation: impl fmt::Display) -> Error {
37 Error::InvalidRelation(format!("{relation} is invalid"))
38 }
39}
40
41impl fmt::Display for Error {
42 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
43 match self {
44 Error::InvalidRelation(relation) => writeln!(f, "{relation} invalid."),
45 Error::DPCompilationError(desc) => writeln!(f, "DPCompilationError: {}", desc),
46 Error::GroupingKeysError(desc) => writeln!(f, "GroupingKeysError: {}", desc),
47 Error::BudgetError(desc) => writeln!(f, "BudgetError: {}", desc),
48 Error::Other(err) => writeln!(f, "{}", err),
49 }
50 }
51}
52
53impl From<expr::Error> for Error {
54 fn from(err: expr::Error) -> Self {
55 Error::Other(err.to_string())
56 }
57}
58impl From<rewriting::Error> for Error {
59 fn from(err: rewriting::Error) -> Self {
60 Error::Other(err.to_string())
61 }
62}
63impl From<privacy_unit_tracking::Error> for Error {
64 fn from(err: privacy_unit_tracking::Error) -> Self {
65 Error::Other(err.to_string())
66 }
67}
68
69impl error::Error for Error {}
70pub type Result<T> = result::Result<T, Error>;
71
72#[derive(Clone, Debug)]
74pub struct DpRelation {
75 relation: Relation,
76 dp_event: DpEvent,
77}
78
79impl From<DpRelation> for Relation {
80 fn from(value: DpRelation) -> Self {
81 value.relation
82 }
83}
84
85impl DpRelation {
86 pub fn new(relation: Relation, dp_event: DpEvent) -> Self {
87 DpRelation { relation, dp_event }
88 }
89
90 pub fn relation(&self) -> &Relation {
91 &self.relation
92 }
93
94 pub fn dp_event(&self) -> &DpEvent {
95 &self.dp_event
96 }
97}
98
99impl Deref for DpRelation {
100 type Target = Relation;
101
102 fn deref(&self) -> &Self::Target {
103 &self.relation
104 }
105}
106
107impl From<DpRelation> for (Relation, DpEvent) {
108 fn from(value: DpRelation) -> Self {
109 (value.relation, value.dp_event)
110 }
111}
112
113impl From<(Relation, DpEvent)> for DpRelation {
114 fn from(value: (Relation, DpEvent)) -> Self {
115 DpRelation::new(value.0, value.1)
116 }
117}
118
119impl Reduce {
120 pub fn differentially_private(self, parameters: &DpParameters) -> Result<DpRelation> {
124 let mut dp_event = DpEvent::no_op();
125 let max_size = self.size().max().unwrap().clone();
126 let pup_input = PupRelation::try_from(self.input().clone())?;
127 let privacy_unit_unique =
128 pup_input.schema()[pup_input.privacy_unit()].has_unique_or_primary_key_constraint();
129
130 let reduce_with_dp_group_by = if self.group_by().is_empty() {
132 self
133 } else {
134 let (dp_grouping_values, dp_event_group_by) = self
135 .differentially_private_group_by(
136 parameters.epsilon * parameters.tau_thresholding_share,
137 parameters.delta * parameters.tau_thresholding_share,
138 parameters.max_privacy_unit_groups,
139 )?
140 .into();
141 let input_relation_with_privacy_tracked_group_by = self
142 .input()
143 .clone()
144 .join_with_grouping_values(dp_grouping_values)?;
145 let reduce: Reduce = Reduce::builder()
146 .with(self)
147 .input(input_relation_with_privacy_tracked_group_by)
148 .build();
149 dp_event = dp_event.compose(dp_event_group_by);
150 reduce
151 };
152
153 let aggregation_share = if dp_event.is_no_op() {
156 1.
157 } else {
158 1. - parameters.tau_thresholding_share
159 };
160 let aggregation_parameters =
161 DpAggregatesParameters::from_dp_parameters(parameters.clone(), aggregation_share)
162 .with_size(usize::try_from(max_size).unwrap())
163 .with_privacy_unit_unique(privacy_unit_unique);
164
165 let (dp_relation, dp_event_agg) = reduce_with_dp_group_by
167 .differentially_private_aggregates(aggregation_parameters)?
168 .into();
169 dp_event = dp_event.compose(dp_event_agg);
170 Ok((dp_relation, dp_event).into())
171 }
172}
173
174#[cfg(test)]
175mod tests {
176 use super::*;
177 use crate::{
178 ast,
179 builder::With,
180 data_type::{DataType, DataTyped, Variant as _},
181 display::Dot,
182 expr::{AggregateColumn, Expr},
183 io::{postgresql, Database},
184 privacy_unit_tracking::{PrivacyUnit, PrivacyUnitTracking, PupRelation, Strategy},
185 relation::{Constraint, Field, Map, Relation, Schema, Variant},
186 };
187 use std::{collections::HashSet, sync::Arc};
188
189 #[test]
190 fn test_dp_rewrite_reduce_without_group_by() {
191 let mut database = postgresql::test_database();
192 let relations = database.relations();
193 let parameters = DpParameters::from_epsilon_delta(1., 1e-3);
194
195 let table = relations
197 .get(&["item_table".to_string()])
198 .unwrap()
199 .deref()
200 .clone();
201 let privacy_unit_tracking = PrivacyUnitTracking::from((
202 &relations,
203 vec![
204 (
205 "item_table",
206 vec![("order_id", "order_table", "id")],
207 "date",
208 ),
209 ("order_table", vec![], "date"),
210 ],
211 Strategy::Hard,
212 ));
213 let pup_table = privacy_unit_tracking
214 .table(&table.clone().try_into().unwrap())
215 .unwrap();
216 let reduce = Reduce::new(
217 "my_reduce".to_string(),
218 vec![("sum_price".to_string(), AggregateColumn::sum("price"))],
219 vec![],
220 pup_table.deref().clone().into(),
221 );
222 let relation = Relation::from(reduce.clone());
223 relation.display_dot().unwrap();
224
225 let (dp_relation, dp_event) = reduce.differentially_private(¶meters).unwrap().into();
226 dp_relation.display_dot().unwrap();
227 let _mult: f64 = 2000.
228 * DpAggregatesParameters::from_dp_parameters(parameters.clone(), 1.)
229 .privacy_unit_multiplicity();
230 assert!(matches!(
231 dp_event,
232 DpEvent::Gaussian {
233 noise_multiplier: _
234 }
235 ));
236 assert!(dp_relation
237 .data_type()
238 .is_subset_of(&DataType::structured([("sum_price", DataType::float())])));
239
240 let query: &str = &ast::Query::from(&dp_relation).to_string();
241 _ = database.query(query).unwrap();
242
243 let table = relations
245 .get(&["table_1".to_string()])
246 .unwrap()
247 .deref()
248 .clone();
249 let privacy_unit_tracking = PrivacyUnitTracking::from((
250 &relations,
251 vec![("table_1", vec![], PrivacyUnit::privacy_unit_row())],
252 Strategy::Hard,
253 ));
254 let pup_table = privacy_unit_tracking
255 .table(&table.clone().try_into().unwrap())
256 .unwrap();
257 let map = Map::new(
258 "my_map".to_string(),
259 vec![("my_d".to_string(), expr!(d / 100))],
260 None,
261 vec![],
262 None,
263 None,
264 Arc::new(table.into()),
265 );
266 let pup_map = privacy_unit_tracking
267 .map(
268 &map.clone().try_into().unwrap(),
269 PupRelation(Relation::from(pup_table)),
270 )
271 .unwrap();
272 let reduce = Reduce::new(
273 "my_reduce".to_string(),
274 vec![("sum_d".to_string(), AggregateColumn::sum("my_d"))],
275 vec![],
276 pup_map.deref().clone().into(),
277 );
278 let relation = Relation::from(reduce.clone());
279 relation.display_dot().unwrap();
280
281 let (dp_relation, dp_event) = reduce.differentially_private(¶meters).unwrap().into();
282 dp_relation.display_dot().unwrap();
283 assert!(dp_event.is_no_op()); assert_eq!(
285 dp_relation.data_type(),
286 DataType::structured([("sum_d", DataType::float_value(0.))])
287 );
288
289 let query: &str = &ast::Query::from(&dp_relation).to_string();
290 _ = database.query(query).unwrap();
291 }
292
293 #[test]
294 fn test_dp_rewrite_reduce_group_by_possible_values() {
295 let mut database = postgresql::test_database();
296 let relations = database.relations();
297 let table = relations
298 .get(&["item_table".to_string()])
299 .unwrap()
300 .deref()
301 .clone();
302 let parameters = DpParameters::from_epsilon_delta(1., 1e-3);
303
304 let privacy_unit_tracking = PrivacyUnitTracking::from((
306 &relations,
307 vec![
308 (
309 "item_table",
310 vec![("order_id", "order_table", "id")],
311 "date",
312 ),
313 ("order_table", vec![], "date"),
314 ],
315 Strategy::Hard,
316 ));
317 let pup_table = privacy_unit_tracking
318 .table(&table.try_into().unwrap())
319 .unwrap();
320 let map: Map = Relation::map()
321 .with(("order_id", expr!(order_id)))
322 .with(("price", expr!(price)))
323 .filter(Expr::in_list(
324 Expr::col("order_id"),
325 Expr::list(vec![1, 2, 3, 4, 5]),
326 ))
327 .input(pup_table.deref().clone())
328 .build();
329 let pup_map = privacy_unit_tracking
330 .map(&map.try_into().unwrap(), pup_table)
331 .unwrap();
332
333 let reduce = Reduce::new(
334 "my_reduce".to_string(),
335 vec![("sum_price".to_string(), AggregateColumn::sum("price"))],
336 vec!["order_id".into()],
337 pup_map.deref().clone().into(),
338 );
339 let relation = Relation::from(reduce.clone());
340 relation.display_dot().unwrap();
341
342 let (dp_relation, dp_event) = reduce.differentially_private(¶meters).unwrap().into();
343 dp_relation.display_dot().unwrap();
344 assert!(matches!(
345 dp_event,
346 DpEvent::Gaussian {
347 noise_multiplier: _
348 }
349 ));
350 assert!(dp_relation
351 .data_type()
352 .is_subset_of(&DataType::structured([("sum_price", DataType::float())])));
353
354 let query: &str = &ast::Query::from(&dp_relation).to_string();
355 println!("{query}");
356 _ = database.query(query).unwrap();
357 }
358
359 #[test]
360 fn test_dp_rewrite_reduce_group_by_tau_thresholding() {
361 let mut database = postgresql::test_database();
362 let relations = database.relations();
363 let table = relations
364 .get(&["item_table".to_string()])
365 .unwrap()
366 .deref()
367 .clone();
368 let parameters =
369 DpParameters::from_epsilon_delta(100., 1e-3).with_max_privacy_unit_groups(10);
370
371 let privacy_unit_tracking = PrivacyUnitTracking::from((
373 &relations,
374 vec![
375 (
376 "item_table",
377 vec![("order_id", "order_table", "id")],
378 "date",
379 ),
380 ("order_table", vec![], "date"),
381 ],
382 Strategy::Hard,
383 ));
384 let pup_table = privacy_unit_tracking
385 .table(&table.try_into().unwrap())
386 .unwrap();
387 let map: Map = Relation::map()
388 .with(("order_id", expr!(order_id)))
389 .with(("price", expr!(price)))
390 .input(pup_table.deref().clone())
391 .build();
392 let pup_map = privacy_unit_tracking
393 .map(&map.try_into().unwrap(), pup_table)
394 .unwrap();
395
396 let reduce = Reduce::new(
397 "my_reduce".to_string(),
398 vec![("sum_price".to_string(), AggregateColumn::sum("price"))],
399 vec!["order_id".into()],
400 pup_map.deref().clone().into(),
401 );
402 let relation = Relation::from(reduce.clone());
403 relation.display_dot().unwrap();
404
405 let (dp_relation, dp_event) = reduce.differentially_private(¶meters).unwrap().into();
406 dp_relation.display_dot().unwrap();
407 assert!(matches!(dp_event, DpEvent::Composed { events: _ }));
408 assert!(dp_relation
409 .data_type()
410 .is_subset_of(&DataType::structured([("sum_price", DataType::float())])));
411
412 let query: &str = &ast::Query::from(&dp_relation).to_string();
413 println!("{query}");
414 let res = database.query(query).unwrap();
415 println!("\n{:?}", res);
416 }
417
418 #[test]
419 fn test_dp_rewrite_reduce_group_by_possible_both() {
420 let mut database = postgresql::test_database();
421 let relations = database.relations();
422 let table = relations
423 .get(&["item_table".to_string()])
424 .unwrap()
425 .deref()
426 .clone();
427 let parameters = DpParameters::from_epsilon_delta(1., 1e-3);
428
429 let privacy_unit_tracking = PrivacyUnitTracking::from((
431 &relations,
432 vec![
433 (
434 "item_table",
435 vec![("order_id", "order_table", "id")],
436 "date",
437 ),
438 ("order_table", vec![], "date"),
439 ],
440 Strategy::Hard,
441 ));
442 let pup_table = privacy_unit_tracking
443 .table(&table.try_into().unwrap())
444 .unwrap();
445 let map: Map = Relation::map()
446 .with(("order_id", expr!(order_id)))
447 .with(("item", expr!(item)))
448 .with(("price", expr!(price)))
449 .filter(Expr::in_list(
450 Expr::col("order_id"),
451 Expr::list(vec![1, 2, 3, 4, 5]),
452 ))
453 .input(pup_table.deref().clone())
454 .build();
455 let pup_map = privacy_unit_tracking
456 .map(&map.try_into().unwrap(), pup_table)
457 .unwrap();
458
459 let reduce = Reduce::new(
460 "my_reduce".to_string(),
461 vec![
462 ("item".to_string(), AggregateColumn::first("item")),
463 ("order_id".to_string(), AggregateColumn::first("order_id")),
464 ("sum_price".to_string(), AggregateColumn::sum("price")),
465 ],
466 vec!["order_id".into(), "item".into()],
467 pup_map.deref().clone().into(),
468 );
469 let relation = Relation::from(reduce.clone());
470 relation.display_dot().unwrap();
471
472 let (dp_relation, dp_event) = reduce.differentially_private(¶meters).unwrap().into();
473 dp_relation.display_dot().unwrap();
474 assert!(matches!(dp_event, DpEvent::Composed { events: _ }));
475 assert!(dp_relation.schema()[0]
476 .data_type()
477 .is_subset_of(&DataType::text()));
478 assert_eq!(
479 dp_relation.schema()[1].data_type(),
480 DataType::integer_values(vec![1, 2, 3, 4, 5])
481 );
482 assert!(dp_relation.schema()[2]
483 .data_type()
484 .is_subset_of(&DataType::float()));
485
486 let query: &str = &ast::Query::from(&dp_relation).to_string();
487 println!("{query}");
488 _ = database.query(query).unwrap();
489 }
490
491 #[test]
492 fn test_differentially_private_output_all_grouping_keys_simple() {
493 let mut database = postgresql::test_database();
495 let relations = database.relations();
496 let table = relations
497 .get(&["large_user_table".into()])
498 .unwrap()
499 .as_ref()
500 .clone();
501 let new_schema: Schema = table
502 .schema()
503 .iter()
504 .map(|f| {
505 if f.name() == "city" {
506 Field::from_name_data_type("city", DataType::text())
507 } else {
508 f.clone()
509 }
510 })
511 .collect();
512 let table: Relation = Relation::table()
513 .path(["large_user_table"])
514 .name("more_users")
515 .size(100000)
516 .schema(new_schema)
517 .build();
518 let input: Relation = Relation::map()
519 .name("map_relation")
520 .with(("income", expr!(income)))
521 .with(("city", expr!(city)))
522 .with((PrivacyUnit::privacy_unit(), expr!(id)))
523 .with((PrivacyUnit::privacy_unit_weight(), expr!(id)))
524 .filter(Expr::in_list(
525 Expr::col("city"),
526 Expr::list(vec!["Paris".to_string(), "London".to_string()]),
527 ))
528 .input(table.clone())
529 .build();
530 let reduce: Reduce = Relation::reduce()
531 .name("reduce_relation")
532 .with(("city".to_string(), AggregateColumn::first("city")))
533 .with(("count_income".to_string(), AggregateColumn::count("income")))
534 .group_by(expr!(city))
535 .input(input)
536 .build();
537 let (dp_relation, dp_event) = reduce
538 .differentially_private(&DpParameters::from_epsilon_delta(10., 1e-5))
539 .unwrap()
540 .into();
541 println!("{}", dp_event);
542 dp_relation.display_dot().unwrap();
543 let query: &str = &ast::Query::from(&dp_relation).to_string();
544 let results = database.query(query).unwrap();
545 println!("results = {:?}", results);
546 let city_keys: HashSet<_> = results
547 .iter()
548 .map(|row| row.to_vec().clone()[0].clone().to_string())
549 .collect();
550 let correct_keys: HashSet<_> = vec!["London".to_string(), "Paris".to_string()]
551 .into_iter()
552 .collect();
553 assert_eq!(city_keys, correct_keys);
554 }
555
556 #[test]
557 fn test_differentially_private_output_all_grouping_keys() {
558 let mut database = postgresql::test_database();
560 let relations = database.relations();
561 let table = relations
562 .get(&["large_user_table".into()])
563 .unwrap()
564 .as_ref()
565 .clone();
566 let new_schema: Schema = table
567 .schema()
568 .iter()
569 .map(|f| {
570 if f.name() == "city" {
571 Field::from_name_data_type("city", DataType::text())
572 } else {
573 f.clone()
574 }
575 })
576 .collect();
577 let table: Relation = Relation::table()
578 .path(["large_user_table"])
579 .name("more_users")
580 .size(100000)
581 .schema(new_schema)
582 .build();
583 let input: Relation = Relation::map()
584 .name("map_relation")
585 .with(("income", expr!(income)))
586 .with(("city", expr!(city)))
587 .with(("age", expr!(age)))
588 .with((PrivacyUnit::privacy_unit(), expr!(id)))
589 .with((PrivacyUnit::privacy_unit_weight(), expr!(id)))
590 .filter(Expr::in_list(
591 Expr::col("city"),
592 Expr::list(vec!["Paris".to_string(), "London".to_string()]),
593 ))
594 .input(table.clone())
595 .build();
596 let reduce: Reduce = Relation::reduce()
597 .name("reduce_relation")
598 .with(("city".to_string(), AggregateColumn::first("city")))
599 .with(("age".to_string(), AggregateColumn::first("age")))
600 .with(("sum_income".to_string(), AggregateColumn::sum("income")))
601 .group_by(expr!(city))
602 .group_by(expr!(age))
603 .input(input)
604 .build();
605 let (dp_relation, dp_event) = reduce
606 .differentially_private(&DpParameters::from_epsilon_delta(10., 1e-5))
607 .unwrap()
608 .into();
609 println!("{}", dp_event);
610 dp_relation.display_dot().unwrap();
611 let query: &str = &ast::Query::from(&dp_relation).to_string();
612 let results = database.query(query).unwrap();
613 println!("{:?}", results);
614 let city_keys: HashSet<_> = results
615 .iter()
616 .map(|row| row.to_vec().clone()[0].clone().to_string())
617 .collect();
618 let correct_keys: HashSet<_> = vec!["London".to_string(), "Paris".to_string()]
619 .into_iter()
620 .collect();
621 assert_eq!(city_keys, correct_keys);
622 }
623
624 #[test]
625 fn test_dp_rewrite_reduce() {
626 let mut database = postgresql::test_database();
627 let relations = database.relations();
628 let table = relations
629 .get(&["table_1".to_string()])
630 .unwrap()
631 .deref()
632 .clone();
633 let parameters = DpParameters::from_epsilon_delta(1., 1e-3);
634
635 let privacy_unit_tracking = PrivacyUnitTracking::from((
637 &relations,
638 vec![("table_1", vec![], PrivacyUnit::privacy_unit_row())],
639 Strategy::Hard,
640 ));
641 let pup_table = privacy_unit_tracking
642 .table(&table.try_into().unwrap())
643 .unwrap();
644 let reduce = Reduce::new(
645 "my_reduce".to_string(),
646 vec![
647 ("sum_a".to_string(), AggregateColumn::sum("a")),
648 ("d".to_string(), AggregateColumn::first("d")),
649 ("max_d".to_string(), AggregateColumn::max("d")),
650 ],
651 vec!["d".into()],
652 pup_table.deref().clone().into(),
653 );
654 let relation = Relation::from(reduce.clone());
655 relation.display_dot().unwrap();
656
657 let (dp_relation, dp_event) = reduce.differentially_private(¶meters).unwrap().into();
658 dp_relation.display_dot().unwrap();
659 assert_eq!(
660 dp_event,
661 DpEvent::epsilon_delta(
662 parameters.epsilon * parameters.tau_thresholding_share,
663 parameters.delta * parameters.tau_thresholding_share
664 )
665 .compose(DpEvent::gaussian_from_epsilon_delta(
666 parameters.epsilon * (1. - parameters.tau_thresholding_share),
667 parameters.delta * (1. - parameters.tau_thresholding_share),
668 ))
669 );
670 let correct_schema: Schema = vec![
671 ("sum_a", DataType::float_interval(0., 100.), None),
672 (
673 "d",
674 DataType::integer_interval(0, 10),
675 Some(Constraint::Unique),
676 ),
677 (
678 "max_d",
679 DataType::integer_interval(0, 10),
680 Some(Constraint::Unique),
681 ),
682 ]
683 .into_iter()
684 .collect();
685 assert_eq!(dp_relation.schema(), &correct_schema);
686
687 let query: &str = &ast::Query::from(&dp_relation).to_string();
688 println!("{query}");
689 _ = database.query(query).unwrap();
690 }
691}