1use crate::time::PhysicalTime;
16use crate::types::identifiers::ContextId;
17use crate::{AuthorityId, Result};
18use async_trait::async_trait;
19use serde::{Deserialize, Serialize};
20
21#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
23pub enum ObserverClass {
24 External,
26 Neighbor,
28 InGroup,
30}
31
32#[derive(Debug, Clone, Serialize, Deserialize)]
34pub struct LeakageEvent {
35 pub source: AuthorityId,
37 pub destination: AuthorityId,
39 pub context_id: ContextId,
41 pub leakage_amount: u64,
43 pub observer_class: ObserverClass,
45 pub operation: String,
47 pub timestamp: PhysicalTime,
49}
50
51impl LeakageEvent {
52 pub fn timestamp_ms(&self) -> u64 {
54 self.timestamp.ts_ms
55 }
56
57 #[must_use]
59 pub fn with_timestamp_ms(
60 source: AuthorityId,
61 destination: AuthorityId,
62 context_id: ContextId,
63 leakage_amount: u64,
64 observer_class: ObserverClass,
65 operation: String,
66 timestamp_ms: u64,
67 ) -> Self {
68 Self {
69 source,
70 destination,
71 context_id,
72 leakage_amount,
73 observer_class,
74 operation,
75 timestamp: PhysicalTime {
76 ts_ms: timestamp_ms,
77 uncertainty: None,
78 },
79 }
80 }
81}
82
83#[derive(Debug, Clone, Default, Serialize, Deserialize)]
85pub struct LeakageBudget {
86 pub external_consumed: u64,
88 pub neighbor_consumed: u64,
89 pub in_group_consumed: u64,
90}
91
92impl LeakageBudget {
93 pub fn zero() -> Self {
95 Self::default()
96 }
97
98 pub fn add(&self, other: &LeakageBudget) -> Self {
100 Self {
101 external_consumed: self.external_consumed + other.external_consumed,
102 neighbor_consumed: self.neighbor_consumed + other.neighbor_consumed,
103 in_group_consumed: self.in_group_consumed + other.in_group_consumed,
104 }
105 }
106
107 pub fn is_within_limits(&self, limits: &LeakageBudget) -> bool {
109 self.external_consumed <= limits.external_consumed
110 && self.neighbor_consumed <= limits.neighbor_consumed
111 && self.in_group_consumed <= limits.in_group_consumed
112 }
113
114 pub fn for_observer(&self, observer: ObserverClass) -> u64 {
116 match observer {
117 ObserverClass::External => self.external_consumed,
118 ObserverClass::Neighbor => self.neighbor_consumed,
119 ObserverClass::InGroup => self.in_group_consumed,
120 }
121 }
122
123 pub fn set_for_observer(&mut self, observer: ObserverClass, amount: u64) {
125 match observer {
126 ObserverClass::External => self.external_consumed = amount,
127 ObserverClass::Neighbor => self.neighbor_consumed = amount,
128 ObserverClass::InGroup => self.in_group_consumed = amount,
129 }
130 }
131}
132
133#[async_trait]
135pub trait LeakageEffects: Send + Sync {
136 async fn record_leakage(&self, event: LeakageEvent) -> Result<()>;
138
139 async fn get_leakage_budget(&self, context_id: ContextId) -> Result<LeakageBudget>;
141
142 async fn check_leakage_budget(
144 &self,
145 context_id: ContextId,
146 observer: ObserverClass,
147 amount: u64,
148 ) -> Result<bool>;
149
150 async fn get_leakage_history(
156 &self,
157 context_id: ContextId,
158 since_timestamp: Option<&PhysicalTime>,
159 ) -> Result<Vec<LeakageEvent>>;
160}
161
162#[async_trait]
164pub trait LeakageChoreographyExt: LeakageEffects {
165 async fn record_send_leakage(
170 &self,
171 source: AuthorityId,
172 destination: AuthorityId,
173 context_id: ContextId,
174 flow_cost: u64,
175 observer_classes: &[ObserverClass],
176 timestamp: &PhysicalTime,
177 ) -> Result<()> {
178 for observer in observer_classes {
179 let event = LeakageEvent {
180 source,
181 destination,
182 context_id,
183 leakage_amount: flow_cost,
184 observer_class: *observer,
185 operation: "send".to_string(),
186 timestamp: timestamp.clone(),
187 };
188 self.record_leakage(event).await?;
189 }
190 Ok(())
191 }
192
193 async fn record_recv_leakage(
198 &self,
199 source: AuthorityId,
200 destination: AuthorityId,
201 context_id: ContextId,
202 flow_cost: u64,
203 observer_classes: &[ObserverClass],
204 timestamp: &PhysicalTime,
205 ) -> Result<()> {
206 for observer in observer_classes {
207 let event = LeakageEvent {
208 source,
209 destination,
210 context_id,
211 leakage_amount: flow_cost,
212 observer_class: *observer,
213 operation: "recv".to_string(),
214 timestamp: timestamp.clone(),
215 };
216 self.record_leakage(event).await?;
217 }
218 Ok(())
219 }
220}
221
222impl<T: LeakageEffects> LeakageChoreographyExt for T {}
224
225#[async_trait]
227impl<T: LeakageEffects + ?Sized> LeakageEffects for std::sync::Arc<T> {
228 async fn record_leakage(&self, event: LeakageEvent) -> Result<()> {
229 (**self).record_leakage(event).await
230 }
231
232 async fn get_leakage_budget(&self, context_id: ContextId) -> Result<LeakageBudget> {
233 (**self).get_leakage_budget(context_id).await
234 }
235
236 async fn check_leakage_budget(
237 &self,
238 context_id: ContextId,
239 observer: ObserverClass,
240 amount: u64,
241 ) -> Result<bool> {
242 (**self)
243 .check_leakage_budget(context_id, observer, amount)
244 .await
245 }
246
247 async fn get_leakage_history(
248 &self,
249 context_id: ContextId,
250 since_timestamp: Option<&PhysicalTime>,
251 ) -> Result<Vec<LeakageEvent>> {
252 (**self)
253 .get_leakage_history(context_id, since_timestamp)
254 .await
255 }
256}
257
258#[cfg(test)]
259mod tests {
260 use super::*;
261
262 #[test]
263 fn test_leakage_budget() {
264 let mut budget = LeakageBudget::zero();
265 assert_eq!(budget.for_observer(ObserverClass::External), 0);
266
267 budget.set_for_observer(ObserverClass::External, 100);
268 assert_eq!(budget.for_observer(ObserverClass::External), 100);
269
270 let other = LeakageBudget {
271 external_consumed: 50,
272 neighbor_consumed: 25,
273 in_group_consumed: 10,
274 };
275
276 let combined = budget.add(&other);
277 assert_eq!(combined.external_consumed, 150);
278 assert_eq!(combined.neighbor_consumed, 25);
279 assert_eq!(combined.in_group_consumed, 10);
280 }
281}