1use blake2::{Blake2b, Digest, digest::consts::U32};
7use chrono::{DateTime, Utc};
8use serde::{Deserialize, Serialize};
9use std::path::Path;
10use thiserror::Error;
11
12use crate::types::{ChannelId, SessionKey, TokenUsage};
13
14#[derive(Error, Debug)]
16pub enum EventStoreError {
17 #[error("Storage error: {0}")]
19 Storage(#[from] sled::Error),
20
21 #[error("Serialization error: {0}")]
23 Serialization(#[from] serde_json::Error),
24
25 #[error("Event not found: {0}")]
27 NotFound(String),
28}
29
30#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
32pub struct EventId(pub [u8; 32]);
33
34impl EventId {
35 #[must_use]
37 pub fn from_content(content: &[u8]) -> Self {
38 let mut hasher = Blake2b::<U32>::new();
39 hasher.update(content);
40 let result = hasher.finalize();
41 let mut id = [0u8; 32];
42 id.copy_from_slice(&result);
43 Self(id)
44 }
45
46 #[must_use]
48 pub fn to_hex(&self) -> String {
49 hex::encode(self.0)
50 }
51}
52
53impl std::fmt::Display for EventId {
54 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
55 write!(f, "{}", &self.to_hex()[..12])
56 }
57}
58
59#[derive(Debug, Clone, Serialize, Deserialize)]
61pub struct SessionEvent {
62 pub id: EventId,
64 pub session_key: SessionKey,
66 pub agent_id: String,
68 pub timestamp: DateTime<Utc>,
70 pub kind: SessionEventKind,
72}
73
74impl SessionEvent {
75 #[must_use]
77 pub fn new(session_key: SessionKey, agent_id: String, kind: SessionEventKind) -> Self {
78 let timestamp = Utc::now();
79 let content = format!("{session_key}:{agent_id}:{timestamp}:{kind:?}");
80 let id = EventId::from_content(content.as_bytes());
81
82 Self {
83 id,
84 session_key,
85 agent_id,
86 timestamp,
87 kind,
88 }
89 }
90}
91
92#[derive(Debug, Clone, Serialize, Deserialize)]
94#[serde(tag = "type", rename_all = "snake_case")]
95pub enum SessionEventKind {
96 SessionStarted {
98 channel: String,
100 peer_id: String,
102 },
103
104 MessageReceived {
106 content: String,
108 attachments: Vec<AttachmentMeta>,
110 },
111
112 MessageSent {
114 content: String,
116 message_id: String,
118 },
119
120 ToolCalled {
122 tool_name: String,
124 params: serde_json::Value,
126 },
127
128 ToolResult {
130 tool_name: String,
132 result: serde_json::Value,
134 success: bool,
136 },
137
138 AgentResponse {
140 content: String,
142 model: String,
144 tokens: TokenUsage,
146 },
147
148 SessionEnded {
150 reason: String,
152 },
153
154 StateChanged {
156 key: String,
158 value: serde_json::Value,
160 },
161}
162
163#[derive(Debug, Clone, Serialize, Deserialize)]
165pub struct AttachmentMeta {
166 pub kind: String,
168 pub mime_type: Option<String>,
170 pub size: Option<u64>,
172}
173
174#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
176#[serde(rename_all = "lowercase")]
177pub enum SessionState {
178 Active,
180 Paused,
182 Ended,
184}
185
186#[derive(Debug, Clone, Serialize, Deserialize)]
188pub enum SessionMessage {
189 Inbound(String),
191 Outbound(String),
193 Tool {
195 name: String,
197 result: String,
199 },
200}
201
202#[derive(Debug, Clone, Serialize, Deserialize)]
207pub struct SessionProjection {
208 pub session_key: SessionKey,
210 pub agent_id: String,
212 pub channel: ChannelId,
214 pub peer_id: String,
216 pub state: SessionState,
218 pub message_count: u64,
220 pub last_activity: DateTime<Utc>,
222 pub messages: Vec<SessionMessage>,
224 #[serde(default)]
226 pub custom_state: std::collections::HashMap<String, serde_json::Value>,
227 pub last_event_id: Option<EventId>,
229}
230
231impl SessionProjection {
232 #[must_use]
234 pub fn new(
235 session_key: SessionKey,
236 agent_id: String,
237 channel: ChannelId,
238 peer_id: String,
239 ) -> Self {
240 Self {
241 session_key,
242 agent_id,
243 channel,
244 peer_id,
245 state: SessionState::Active,
246 message_count: 0,
247 last_activity: Utc::now(),
248 messages: Vec::new(),
249 custom_state: std::collections::HashMap::new(),
250 last_event_id: None,
251 }
252 }
253
254 pub fn apply(&mut self, event: &SessionEvent) {
256 match &event.kind {
257 SessionEventKind::SessionStarted { .. } => {
258 self.state = SessionState::Active;
259 }
260 SessionEventKind::MessageReceived { content, .. } => {
261 self.messages.push(SessionMessage::Inbound(content.clone()));
262 self.message_count += 1;
263 }
264 SessionEventKind::MessageSent { content, .. } => {
265 self.messages
266 .push(SessionMessage::Outbound(content.clone()));
267 }
268 SessionEventKind::ToolCalled { tool_name, .. } => {
269 tracing::debug!(tool = %tool_name, "Tool called");
271 }
272 SessionEventKind::ToolResult {
273 tool_name, result, ..
274 } => {
275 let result_str = serde_json::to_string(result).unwrap_or_default();
276 self.messages.push(SessionMessage::Tool {
277 name: tool_name.clone(),
278 result: result_str,
279 });
280 }
281 SessionEventKind::AgentResponse { content, .. } => {
282 self.messages
283 .push(SessionMessage::Outbound(content.clone()));
284 }
285 SessionEventKind::SessionEnded { .. } => {
286 self.state = SessionState::Ended;
287 }
288 SessionEventKind::StateChanged { key, value } => {
289 self.custom_state.insert(key.clone(), value.clone());
290 }
291 }
292
293 self.last_activity = event.timestamp;
294 self.last_event_id = Some(event.id.clone());
295 }
296
297 pub fn merge(&mut self, other: &Self) {
301 if other.last_activity > self.last_activity {
303 self.state = other.state;
304 self.last_activity = other.last_activity;
305 self.last_event_id = other.last_event_id.clone();
306 }
307
308 if other.messages.len() > self.messages.len() {
311 self.messages = other.messages.clone();
312 self.message_count = other.message_count;
313 }
314
315 for (key, value) in &other.custom_state {
317 self.custom_state.insert(key.clone(), value.clone());
318 }
319 }
320}
321
322pub struct EventStore {
324 db: sled::Db,
325 events_tree: sled::Tree,
326 sessions_tree: sled::Tree,
327}
328
329impl EventStore {
330 pub fn open(path: &Path) -> Result<Self, EventStoreError> {
336 let db = sled::open(path)?;
337 let events_tree = db.open_tree("events")?;
338 let sessions_tree = db.open_tree("sessions")?;
339
340 Ok(Self {
341 db,
342 events_tree,
343 sessions_tree,
344 })
345 }
346
347 pub fn append(&self, event: &SessionEvent) -> Result<EventId, EventStoreError> {
353 let event_key = format!("{}:{}", event.session_key, event.id.to_hex());
354 let event_data = serde_json::to_vec(event)?;
355
356 self.events_tree.insert(event_key.as_bytes(), event_data)?;
357
358 self.update_projection(event)?;
360
361 Ok(event.id.clone())
362 }
363
364 pub fn get_events(
370 &self,
371 session_key: &SessionKey,
372 ) -> Result<Vec<SessionEvent>, EventStoreError> {
373 let prefix = format!("{session_key}:");
374 let mut events = Vec::new();
375
376 for result in self.events_tree.scan_prefix(prefix.as_bytes()) {
377 let (_, value) = result?;
378 let event: SessionEvent = serde_json::from_slice(&value)?;
379 events.push(event);
380 }
381
382 events.sort_by_key(|e| e.timestamp);
384 Ok(events)
385 }
386
387 pub fn get_events_since(
393 &self,
394 session_key: &SessionKey,
395 since: DateTime<Utc>,
396 ) -> Result<Vec<SessionEvent>, EventStoreError> {
397 let events = self.get_events(session_key)?;
398 Ok(events.into_iter().filter(|e| e.timestamp > since).collect())
399 }
400
401 pub fn get_projection(
407 &self,
408 session_key: &SessionKey,
409 ) -> Result<SessionProjection, EventStoreError> {
410 let key = session_key.as_ref().as_bytes();
411
412 match self.sessions_tree.get(key)? {
413 Some(data) => Ok(serde_json::from_slice(&data)?),
414 None => Err(EventStoreError::NotFound(session_key.to_string())),
415 }
416 }
417
418 pub fn list_sessions(&self) -> Result<Vec<SessionKey>, EventStoreError> {
424 let mut sessions = Vec::new();
425
426 for result in &self.sessions_tree {
427 let (key, _) = result?;
428 if let Ok(key_str) = std::str::from_utf8(&key) {
429 sessions.push(SessionKey::new(key_str));
430 }
431 }
432
433 Ok(sessions)
434 }
435
436 fn update_projection(&self, event: &SessionEvent) -> Result<(), EventStoreError> {
438 let key = event.session_key.as_ref().as_bytes();
439
440 let mut projection = match self.sessions_tree.get(key)? {
441 Some(data) => serde_json::from_slice(&data)?,
442 None => {
443 if let SessionEventKind::SessionStarted { channel, peer_id } = &event.kind {
445 SessionProjection::new(
446 event.session_key.clone(),
447 event.agent_id.clone(),
448 ChannelId::new(channel),
449 peer_id.clone(),
450 )
451 } else {
452 SessionProjection::new(
454 event.session_key.clone(),
455 event.agent_id.clone(),
456 ChannelId::new("unknown"),
457 "unknown".to_string(),
458 )
459 }
460 }
461 };
462
463 projection.apply(event);
464
465 let projection_data = serde_json::to_vec(&projection)?;
466 self.sessions_tree.insert(key, projection_data)?;
467
468 Ok(())
469 }
470
471 pub fn flush(&self) -> Result<(), EventStoreError> {
477 self.db.flush()?;
478 Ok(())
479 }
480}
481
482#[cfg(test)]
483mod tests {
484 use super::*;
485 use crate::types::AgentId;
486 use tempfile::tempdir;
487
488 #[test]
489 fn test_event_id_generation() {
490 let id1 = EventId::from_content(b"test content");
491 let id2 = EventId::from_content(b"test content");
492 let id3 = EventId::from_content(b"different content");
493
494 assert_eq!(id1, id2);
495 assert_ne!(id1, id3);
496 }
497
498 #[test]
499 fn test_session_event_creation() {
500 let session_key = SessionKey::new("test-session");
501 let event = SessionEvent::new(
502 session_key.clone(),
503 "agent1".to_string(),
504 SessionEventKind::MessageReceived {
505 content: "Hello".to_string(),
506 attachments: vec![],
507 },
508 );
509
510 assert_eq!(event.session_key, session_key);
511 assert_eq!(event.agent_id, "agent1");
512 }
513
514 #[test]
515 fn test_projection_apply() {
516 let mut projection = SessionProjection::new(
517 SessionKey::new("test"),
518 "agent".to_string(),
519 ChannelId::telegram(),
520 "user123".to_string(),
521 );
522
523 let event = SessionEvent::new(
524 SessionKey::new("test"),
525 "agent".to_string(),
526 SessionEventKind::MessageReceived {
527 content: "Hello".to_string(),
528 attachments: vec![],
529 },
530 );
531
532 projection.apply(&event);
533
534 assert_eq!(projection.message_count, 1);
535 assert_eq!(projection.messages.len(), 1);
536 }
537
538 #[test]
539 fn test_event_store_roundtrip() {
540 let temp = tempdir().unwrap();
541 let store = EventStore::open(temp.path()).unwrap();
542
543 let session_key = SessionKey::build(
544 &AgentId::default_agent(),
545 &ChannelId::telegram(),
546 "bot123",
547 crate::types::PeerType::Dm,
548 &crate::types::PeerId::new("user456"),
549 );
550
551 let start_event = SessionEvent::new(
553 session_key.clone(),
554 "default".to_string(),
555 SessionEventKind::SessionStarted {
556 channel: "telegram".to_string(),
557 peer_id: "user456".to_string(),
558 },
559 );
560 store.append(&start_event).unwrap();
561
562 let msg_event = SessionEvent::new(
564 session_key.clone(),
565 "default".to_string(),
566 SessionEventKind::MessageReceived {
567 content: "Hello, agent!".to_string(),
568 attachments: vec![],
569 },
570 );
571 store.append(&msg_event).unwrap();
572
573 let events = store.get_events(&session_key).unwrap();
575 assert_eq!(events.len(), 2);
576
577 let projection = store.get_projection(&session_key).unwrap();
579 assert_eq!(projection.message_count, 1);
580 assert_eq!(projection.state, SessionState::Active);
581 }
582}