Skip to main content

agentic_contracts/
events.rs

1//! Event emission trait for observability.
2//!
3//! All sisters emit standardized events that Hydra can subscribe to
4//! for monitoring, logging, and orchestration.
5
6use crate::context::ContextId;
7use crate::errors::SisterError;
8use crate::grounding::EvidenceType;
9use crate::types::{SisterType, Status, UniqueId};
10use chrono::{DateTime, Utc};
11use serde::{Deserialize, Serialize};
12use std::time::Duration;
13use tokio::sync::broadcast;
14
15/// Unique event identifier.
16#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
17pub struct EventId(pub UniqueId);
18
19impl EventId {
20    pub fn new() -> Self {
21        Self(UniqueId::new())
22    }
23}
24
25impl Default for EventId {
26    fn default() -> Self {
27        Self::new()
28    }
29}
30
31impl std::fmt::Display for EventId {
32    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
33        write!(f, "evt_{}", self.0)
34    }
35}
36
37/// Event types that ALL sisters emit.
38#[derive(Debug, Clone, Serialize, Deserialize)]
39#[serde(tag = "event_type", rename_all = "snake_case")]
40pub enum EventType {
41    // ═══════════════════════════════════════════════════════
42    // LIFECYCLE EVENTS
43    // ═══════════════════════════════════════════════════════
44    /// Sister initialized and ready.
45    Ready,
46
47    /// Sister shutting down.
48    ShuttingDown,
49
50    /// Sister status changed.
51    StatusChanged { from: Status, to: Status },
52
53    // ═══════════════════════════════════════════════════════
54    // CONTEXT EVENTS
55    // ═══════════════════════════════════════════════════════
56    /// Context created.
57    ContextCreated { context_id: ContextId, name: String },
58
59    /// Context switched.
60    ContextSwitched { from: ContextId, to: ContextId },
61
62    /// Context deleted.
63    ContextDeleted { context_id: ContextId },
64
65    // ═══════════════════════════════════════════════════════
66    // OPERATION EVENTS
67    // ═══════════════════════════════════════════════════════
68    /// Operation started.
69    OperationStarted {
70        operation_id: String,
71        operation_type: String,
72    },
73
74    /// Operation completed successfully.
75    OperationCompleted {
76        operation_id: String,
77        #[serde(with = "duration_millis")]
78        duration: Duration,
79    },
80
81    /// Operation failed.
82    OperationFailed {
83        operation_id: String,
84        error_code: String,
85        error_message: String,
86    },
87
88    // ═══════════════════════════════════════════════════════
89    // EVIDENCE EVENTS
90    // ═══════════════════════════════════════════════════════
91    /// Evidence created.
92    EvidenceCreated {
93        evidence_id: String,
94        evidence_type: EvidenceType,
95    },
96
97    /// Grounding performed.
98    GroundingPerformed {
99        grounding_id: String,
100        grounded: bool,
101        confidence: f64,
102    },
103
104    // ═══════════════════════════════════════════════════════
105    // RESOURCE EVENTS
106    // ═══════════════════════════════════════════════════════
107    /// Memory pressure warning.
108    MemoryPressure { usage_percent: f64 },
109
110    /// Storage pressure warning.
111    StoragePressure { usage_percent: f64 },
112
113    // ═══════════════════════════════════════════════════════
114    // CUSTOM EVENTS
115    // ═══════════════════════════════════════════════════════
116    /// Sister-specific custom event.
117    Custom {
118        name: String,
119        data: serde_json::Value,
120    },
121}
122
123/// Event emitted by a sister.
124#[derive(Debug, Clone, Serialize, Deserialize)]
125pub struct SisterEvent {
126    /// Unique event ID.
127    pub id: EventId,
128
129    /// Which sister emitted this.
130    pub sister_type: SisterType,
131
132    /// Event type and data.
133    #[serde(flatten)]
134    pub event_type: EventType,
135
136    /// Timestamp.
137    pub timestamp: DateTime<Utc>,
138
139    /// Context this event occurred in (if applicable).
140    #[serde(skip_serializing_if = "Option::is_none")]
141    pub context_id: Option<ContextId>,
142}
143
144impl SisterEvent {
145    /// Create a new event.
146    pub fn new(sister_type: SisterType, event_type: EventType) -> Self {
147        Self {
148            id: EventId::new(),
149            sister_type,
150            event_type,
151            timestamp: Utc::now(),
152            context_id: None,
153        }
154    }
155
156    /// Add context ID.
157    pub fn in_context(mut self, context_id: ContextId) -> Self {
158        self.context_id = Some(context_id);
159        self
160    }
161
162    // Event constructors
163
164    pub fn ready(sister_type: SisterType) -> Self {
165        Self::new(sister_type, EventType::Ready)
166    }
167
168    pub fn shutting_down(sister_type: SisterType) -> Self {
169        Self::new(sister_type, EventType::ShuttingDown)
170    }
171
172    pub fn status_changed(sister_type: SisterType, from: Status, to: Status) -> Self {
173        Self::new(sister_type, EventType::StatusChanged { from, to })
174    }
175
176    pub fn context_created(sister_type: SisterType, context_id: ContextId, name: String) -> Self {
177        Self::new(sister_type, EventType::ContextCreated { context_id, name })
178    }
179
180    pub fn context_switched(sister_type: SisterType, from: ContextId, to: ContextId) -> Self {
181        Self::new(sister_type, EventType::ContextSwitched { from, to })
182    }
183
184    pub fn operation_started(
185        sister_type: SisterType,
186        operation_id: impl Into<String>,
187        operation_type: impl Into<String>,
188    ) -> Self {
189        Self::new(
190            sister_type,
191            EventType::OperationStarted {
192                operation_id: operation_id.into(),
193                operation_type: operation_type.into(),
194            },
195        )
196    }
197
198    pub fn operation_completed(
199        sister_type: SisterType,
200        operation_id: impl Into<String>,
201        duration: Duration,
202    ) -> Self {
203        Self::new(
204            sister_type,
205            EventType::OperationCompleted {
206                operation_id: operation_id.into(),
207                duration,
208            },
209        )
210    }
211
212    pub fn operation_failed(
213        sister_type: SisterType,
214        operation_id: impl Into<String>,
215        error: &SisterError,
216    ) -> Self {
217        Self::new(
218            sister_type,
219            EventType::OperationFailed {
220                operation_id: operation_id.into(),
221                error_code: error.code.to_string(),
222                error_message: error.message.clone(),
223            },
224        )
225    }
226
227    pub fn evidence_created(
228        sister_type: SisterType,
229        evidence_id: impl Into<String>,
230        evidence_type: EvidenceType,
231    ) -> Self {
232        Self::new(
233            sister_type,
234            EventType::EvidenceCreated {
235                evidence_id: evidence_id.into(),
236                evidence_type,
237            },
238        )
239    }
240
241    pub fn grounding_performed(
242        sister_type: SisterType,
243        grounding_id: impl Into<String>,
244        grounded: bool,
245        confidence: f64,
246    ) -> Self {
247        Self::new(
248            sister_type,
249            EventType::GroundingPerformed {
250                grounding_id: grounding_id.into(),
251                grounded,
252                confidence,
253            },
254        )
255    }
256}
257
258/// Filter for subscribing to events.
259#[derive(Debug, Clone, Default)]
260pub struct EventFilter {
261    /// Filter by sister type.
262    pub sister_type: Option<SisterType>,
263
264    /// Filter by event type names.
265    pub event_types: Option<Vec<String>>,
266
267    /// Filter by context.
268    pub context_id: Option<ContextId>,
269}
270
271impl EventFilter {
272    pub fn new() -> Self {
273        Self::default()
274    }
275
276    pub fn for_sister(mut self, sister_type: SisterType) -> Self {
277        self.sister_type = Some(sister_type);
278        self
279    }
280
281    pub fn in_context(mut self, context_id: ContextId) -> Self {
282        self.context_id = Some(context_id);
283        self
284    }
285
286    /// Check if an event matches this filter.
287    pub fn matches(&self, event: &SisterEvent) -> bool {
288        if let Some(st) = &self.sister_type {
289            if event.sister_type != *st {
290                return false;
291            }
292        }
293
294        if let Some(ctx) = &self.context_id {
295            if event.context_id.as_ref() != Some(ctx) {
296                return false;
297            }
298        }
299
300        true
301    }
302}
303
304/// Event receiver (broadcast channel).
305pub type EventReceiver = broadcast::Receiver<SisterEvent>;
306
307/// Event sender (broadcast channel).
308pub type EventSender = broadcast::Sender<SisterEvent>;
309
310/// Event emitter trait for observability.
311pub trait EventEmitter {
312    /// Subscribe to events with optional filter.
313    fn subscribe(&self, filter: EventFilter) -> EventReceiver;
314
315    /// Get recent events.
316    fn recent_events(&self, limit: usize) -> Vec<SisterEvent>;
317
318    /// Emit an event (for internal use).
319    fn emit(&self, event: SisterEvent);
320}
321
322/// Helper struct for managing event emission.
323pub struct EventManager {
324    sender: EventSender,
325    recent: std::sync::Mutex<Vec<SisterEvent>>,
326    max_recent: usize,
327}
328
329impl EventManager {
330    /// Create a new event manager.
331    pub fn new(capacity: usize) -> Self {
332        let (sender, _) = broadcast::channel(capacity);
333        Self {
334            sender,
335            recent: std::sync::Mutex::new(Vec::new()),
336            max_recent: 100,
337        }
338    }
339
340    /// Emit an event.
341    pub fn emit(&self, event: SisterEvent) {
342        // Store in recent
343        {
344            let mut recent = self.recent.lock().unwrap();
345            recent.push(event.clone());
346            if recent.len() > self.max_recent {
347                recent.remove(0);
348            }
349        }
350
351        // Broadcast (ignore errors if no subscribers)
352        let _ = self.sender.send(event);
353    }
354
355    /// Subscribe to events.
356    pub fn subscribe(&self) -> EventReceiver {
357        self.sender.subscribe()
358    }
359
360    /// Get recent events.
361    pub fn recent(&self, limit: usize) -> Vec<SisterEvent> {
362        let recent = self.recent.lock().unwrap();
363        recent.iter().rev().take(limit).cloned().collect()
364    }
365}
366
367impl Default for EventManager {
368    fn default() -> Self {
369        Self::new(256)
370    }
371}
372
373// Duration serialization as milliseconds
374mod duration_millis {
375    use serde::{Deserialize, Deserializer, Serializer};
376    use std::time::Duration;
377
378    pub fn serialize<S>(duration: &Duration, serializer: S) -> Result<S::Ok, S::Error>
379    where
380        S: Serializer,
381    {
382        serializer.serialize_u64(duration.as_millis() as u64)
383    }
384
385    pub fn deserialize<'de, D>(deserializer: D) -> Result<Duration, D::Error>
386    where
387        D: Deserializer<'de>,
388    {
389        let ms = u64::deserialize(deserializer)?;
390        Ok(Duration::from_millis(ms))
391    }
392}
393
394#[cfg(test)]
395mod tests {
396    use super::*;
397
398    #[test]
399    fn test_event_creation() {
400        let event = SisterEvent::ready(SisterType::Memory);
401        assert!(matches!(event.event_type, EventType::Ready));
402        assert_eq!(event.sister_type, SisterType::Memory);
403    }
404
405    #[test]
406    fn test_event_filter() {
407        let event = SisterEvent::ready(SisterType::Memory);
408        let filter = EventFilter::new().for_sister(SisterType::Memory);
409        assert!(filter.matches(&event));
410
411        let filter2 = EventFilter::new().for_sister(SisterType::Vision);
412        assert!(!filter2.matches(&event));
413    }
414
415    #[test]
416    fn test_event_manager() {
417        let manager = EventManager::new(10);
418
419        manager.emit(SisterEvent::ready(SisterType::Memory));
420        manager.emit(SisterEvent::ready(SisterType::Vision));
421
422        let recent = manager.recent(10);
423        assert_eq!(recent.len(), 2);
424    }
425}