1use crate::context::ContextId;
7use crate::errors::SisterError;
8use crate::grounding::EvidenceType;
9use crate::types::{SisterType, Status, UniqueId};
10use chrono::{DateTime, Utc};
11use serde::{Deserialize, Serialize};
12use std::time::Duration;
13use tokio::sync::broadcast;
14
15#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
17pub struct EventId(pub UniqueId);
18
19impl EventId {
20 pub fn new() -> Self {
21 Self(UniqueId::new())
22 }
23}
24
25impl Default for EventId {
26 fn default() -> Self {
27 Self::new()
28 }
29}
30
31impl std::fmt::Display for EventId {
32 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
33 write!(f, "evt_{}", self.0)
34 }
35}
36
37#[derive(Debug, Clone, Serialize, Deserialize)]
39#[serde(tag = "event_type", rename_all = "snake_case")]
40pub enum EventType {
41 Ready,
46
47 ShuttingDown,
49
50 StatusChanged { from: Status, to: Status },
52
53 ContextCreated { context_id: ContextId, name: String },
58
59 ContextSwitched { from: ContextId, to: ContextId },
61
62 ContextDeleted { context_id: ContextId },
64
65 OperationStarted {
70 operation_id: String,
71 operation_type: String,
72 },
73
74 OperationCompleted {
76 operation_id: String,
77 #[serde(with = "duration_millis")]
78 duration: Duration,
79 },
80
81 OperationFailed {
83 operation_id: String,
84 error_code: String,
85 error_message: String,
86 },
87
88 EvidenceCreated {
93 evidence_id: String,
94 evidence_type: EvidenceType,
95 },
96
97 GroundingPerformed {
99 grounding_id: String,
100 grounded: bool,
101 confidence: f64,
102 },
103
104 MemoryPressure { usage_percent: f64 },
109
110 StoragePressure { usage_percent: f64 },
112
113 Custom {
118 name: String,
119 data: serde_json::Value,
120 },
121}
122
123#[derive(Debug, Clone, Serialize, Deserialize)]
125pub struct SisterEvent {
126 pub id: EventId,
128
129 pub sister_type: SisterType,
131
132 #[serde(flatten)]
134 pub event_type: EventType,
135
136 pub timestamp: DateTime<Utc>,
138
139 #[serde(skip_serializing_if = "Option::is_none")]
141 pub context_id: Option<ContextId>,
142}
143
144impl SisterEvent {
145 pub fn new(sister_type: SisterType, event_type: EventType) -> Self {
147 Self {
148 id: EventId::new(),
149 sister_type,
150 event_type,
151 timestamp: Utc::now(),
152 context_id: None,
153 }
154 }
155
156 pub fn in_context(mut self, context_id: ContextId) -> Self {
158 self.context_id = Some(context_id);
159 self
160 }
161
162 pub fn ready(sister_type: SisterType) -> Self {
165 Self::new(sister_type, EventType::Ready)
166 }
167
168 pub fn shutting_down(sister_type: SisterType) -> Self {
169 Self::new(sister_type, EventType::ShuttingDown)
170 }
171
172 pub fn status_changed(sister_type: SisterType, from: Status, to: Status) -> Self {
173 Self::new(sister_type, EventType::StatusChanged { from, to })
174 }
175
176 pub fn context_created(sister_type: SisterType, context_id: ContextId, name: String) -> Self {
177 Self::new(sister_type, EventType::ContextCreated { context_id, name })
178 }
179
180 pub fn context_switched(sister_type: SisterType, from: ContextId, to: ContextId) -> Self {
181 Self::new(sister_type, EventType::ContextSwitched { from, to })
182 }
183
184 pub fn operation_started(
185 sister_type: SisterType,
186 operation_id: impl Into<String>,
187 operation_type: impl Into<String>,
188 ) -> Self {
189 Self::new(
190 sister_type,
191 EventType::OperationStarted {
192 operation_id: operation_id.into(),
193 operation_type: operation_type.into(),
194 },
195 )
196 }
197
198 pub fn operation_completed(
199 sister_type: SisterType,
200 operation_id: impl Into<String>,
201 duration: Duration,
202 ) -> Self {
203 Self::new(
204 sister_type,
205 EventType::OperationCompleted {
206 operation_id: operation_id.into(),
207 duration,
208 },
209 )
210 }
211
212 pub fn operation_failed(
213 sister_type: SisterType,
214 operation_id: impl Into<String>,
215 error: &SisterError,
216 ) -> Self {
217 Self::new(
218 sister_type,
219 EventType::OperationFailed {
220 operation_id: operation_id.into(),
221 error_code: error.code.to_string(),
222 error_message: error.message.clone(),
223 },
224 )
225 }
226
227 pub fn evidence_created(
228 sister_type: SisterType,
229 evidence_id: impl Into<String>,
230 evidence_type: EvidenceType,
231 ) -> Self {
232 Self::new(
233 sister_type,
234 EventType::EvidenceCreated {
235 evidence_id: evidence_id.into(),
236 evidence_type,
237 },
238 )
239 }
240
241 pub fn grounding_performed(
242 sister_type: SisterType,
243 grounding_id: impl Into<String>,
244 grounded: bool,
245 confidence: f64,
246 ) -> Self {
247 Self::new(
248 sister_type,
249 EventType::GroundingPerformed {
250 grounding_id: grounding_id.into(),
251 grounded,
252 confidence,
253 },
254 )
255 }
256}
257
258#[derive(Debug, Clone, Default)]
260pub struct EventFilter {
261 pub sister_type: Option<SisterType>,
263
264 pub event_types: Option<Vec<String>>,
266
267 pub context_id: Option<ContextId>,
269}
270
271impl EventFilter {
272 pub fn new() -> Self {
273 Self::default()
274 }
275
276 pub fn for_sister(mut self, sister_type: SisterType) -> Self {
277 self.sister_type = Some(sister_type);
278 self
279 }
280
281 pub fn in_context(mut self, context_id: ContextId) -> Self {
282 self.context_id = Some(context_id);
283 self
284 }
285
286 pub fn matches(&self, event: &SisterEvent) -> bool {
288 if let Some(st) = &self.sister_type {
289 if event.sister_type != *st {
290 return false;
291 }
292 }
293
294 if let Some(ctx) = &self.context_id {
295 if event.context_id.as_ref() != Some(ctx) {
296 return false;
297 }
298 }
299
300 true
301 }
302}
303
304pub type EventReceiver = broadcast::Receiver<SisterEvent>;
306
307pub type EventSender = broadcast::Sender<SisterEvent>;
309
310pub trait EventEmitter {
312 fn subscribe(&self, filter: EventFilter) -> EventReceiver;
314
315 fn recent_events(&self, limit: usize) -> Vec<SisterEvent>;
317
318 fn emit(&self, event: SisterEvent);
320}
321
322pub struct EventManager {
324 sender: EventSender,
325 recent: std::sync::Mutex<Vec<SisterEvent>>,
326 max_recent: usize,
327}
328
329impl EventManager {
330 pub fn new(capacity: usize) -> Self {
332 let (sender, _) = broadcast::channel(capacity);
333 Self {
334 sender,
335 recent: std::sync::Mutex::new(Vec::new()),
336 max_recent: 100,
337 }
338 }
339
340 pub fn emit(&self, event: SisterEvent) {
342 {
344 let mut recent = self.recent.lock().unwrap();
345 recent.push(event.clone());
346 if recent.len() > self.max_recent {
347 recent.remove(0);
348 }
349 }
350
351 let _ = self.sender.send(event);
353 }
354
355 pub fn subscribe(&self) -> EventReceiver {
357 self.sender.subscribe()
358 }
359
360 pub fn recent(&self, limit: usize) -> Vec<SisterEvent> {
362 let recent = self.recent.lock().unwrap();
363 recent.iter().rev().take(limit).cloned().collect()
364 }
365}
366
367impl Default for EventManager {
368 fn default() -> Self {
369 Self::new(256)
370 }
371}
372
373mod duration_millis {
375 use serde::{Deserialize, Deserializer, Serializer};
376 use std::time::Duration;
377
378 pub fn serialize<S>(duration: &Duration, serializer: S) -> Result<S::Ok, S::Error>
379 where
380 S: Serializer,
381 {
382 serializer.serialize_u64(duration.as_millis() as u64)
383 }
384
385 pub fn deserialize<'de, D>(deserializer: D) -> Result<Duration, D::Error>
386 where
387 D: Deserializer<'de>,
388 {
389 let ms = u64::deserialize(deserializer)?;
390 Ok(Duration::from_millis(ms))
391 }
392}
393
394#[cfg(test)]
395mod tests {
396 use super::*;
397
398 #[test]
399 fn test_event_creation() {
400 let event = SisterEvent::ready(SisterType::Memory);
401 assert!(matches!(event.event_type, EventType::Ready));
402 assert_eq!(event.sister_type, SisterType::Memory);
403 }
404
405 #[test]
406 fn test_event_filter() {
407 let event = SisterEvent::ready(SisterType::Memory);
408 let filter = EventFilter::new().for_sister(SisterType::Memory);
409 assert!(filter.matches(&event));
410
411 let filter2 = EventFilter::new().for_sister(SisterType::Vision);
412 assert!(!filter2.matches(&event));
413 }
414
415 #[test]
416 fn test_event_manager() {
417 let manager = EventManager::new(10);
418
419 manager.emit(SisterEvent::ready(SisterType::Memory));
420 manager.emit(SisterEvent::ready(SisterType::Vision));
421
422 let recent = manager.recent(10);
423 assert_eq!(recent.len(), 2);
424 }
425}