Skip to main content

openclaw_core/events/
mod.rs

1//! Event-sourced session storage (grite pattern).
2//!
3//! Sessions are stored as append-only event logs with CRDT projections
4//! for materialized views. Uses sled for fast local storage.
5
6use blake2::{Blake2b, Digest, digest::consts::U32};
7use chrono::{DateTime, Utc};
8use serde::{Deserialize, Serialize};
9use std::path::Path;
10use thiserror::Error;
11
12use crate::types::{ChannelId, SessionKey, TokenUsage};
13
14/// Event store errors.
15#[derive(Error, Debug)]
16pub enum EventStoreError {
17    /// Storage error.
18    #[error("Storage error: {0}")]
19    Storage(#[from] sled::Error),
20
21    /// Serialization error.
22    #[error("Serialization error: {0}")]
23    Serialization(#[from] serde_json::Error),
24
25    /// Event not found.
26    #[error("Event not found: {0}")]
27    NotFound(String),
28}
29
30/// Unique event identifier (`BLAKE2b` hash).
31#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
32pub struct EventId(pub [u8; 32]);
33
34impl EventId {
35    /// Generate event ID from content.
36    #[must_use]
37    pub fn from_content(content: &[u8]) -> Self {
38        let mut hasher = Blake2b::<U32>::new();
39        hasher.update(content);
40        let result = hasher.finalize();
41        let mut id = [0u8; 32];
42        id.copy_from_slice(&result);
43        Self(id)
44    }
45
46    /// Convert to hex string.
47    #[must_use]
48    pub fn to_hex(&self) -> String {
49        hex::encode(self.0)
50    }
51}
52
53impl std::fmt::Display for EventId {
54    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
55        write!(f, "{}", &self.to_hex()[..12])
56    }
57}
58
59/// A session event.
60#[derive(Debug, Clone, Serialize, Deserialize)]
61pub struct SessionEvent {
62    /// Unique event identifier.
63    pub id: EventId,
64    /// Session this event belongs to.
65    pub session_key: SessionKey,
66    /// Agent that processed this event.
67    pub agent_id: String,
68    /// When the event occurred.
69    pub timestamp: DateTime<Utc>,
70    /// Event payload.
71    pub kind: SessionEventKind,
72}
73
74impl SessionEvent {
75    /// Create a new session event.
76    #[must_use]
77    pub fn new(session_key: SessionKey, agent_id: String, kind: SessionEventKind) -> Self {
78        let timestamp = Utc::now();
79        let content = format!("{session_key}:{agent_id}:{timestamp}:{kind:?}");
80        let id = EventId::from_content(content.as_bytes());
81
82        Self {
83            id,
84            session_key,
85            agent_id,
86            timestamp,
87            kind,
88        }
89    }
90}
91
92/// Types of session events.
93#[derive(Debug, Clone, Serialize, Deserialize)]
94#[serde(tag = "type", rename_all = "snake_case")]
95pub enum SessionEventKind {
96    /// Session started.
97    SessionStarted {
98        /// Channel the session is on.
99        channel: String,
100        /// Peer ID.
101        peer_id: String,
102    },
103
104    /// Message received from peer.
105    MessageReceived {
106        /// Message content.
107        content: String,
108        /// Attachment metadata.
109        attachments: Vec<AttachmentMeta>,
110    },
111
112    /// Message sent to peer.
113    MessageSent {
114        /// Message content.
115        content: String,
116        /// Platform message ID.
117        message_id: String,
118    },
119
120    /// Tool was called.
121    ToolCalled {
122        /// Tool name.
123        tool_name: String,
124        /// Tool parameters.
125        params: serde_json::Value,
126    },
127
128    /// Tool returned result.
129    ToolResult {
130        /// Tool name.
131        tool_name: String,
132        /// Tool result.
133        result: serde_json::Value,
134        /// Whether tool succeeded.
135        success: bool,
136    },
137
138    /// Agent produced a response.
139    AgentResponse {
140        /// Response content.
141        content: String,
142        /// Model used.
143        model: String,
144        /// Token usage.
145        tokens: TokenUsage,
146    },
147
148    /// Session ended.
149    SessionEnded {
150        /// Reason for ending.
151        reason: String,
152    },
153
154    /// Session state changed.
155    StateChanged {
156        /// State key.
157        key: String,
158        /// New value.
159        value: serde_json::Value,
160    },
161}
162
163/// Attachment metadata for events.
164#[derive(Debug, Clone, Serialize, Deserialize)]
165pub struct AttachmentMeta {
166    /// Attachment type.
167    pub kind: String,
168    /// MIME type.
169    pub mime_type: Option<String>,
170    /// File size.
171    pub size: Option<u64>,
172}
173
174/// Session state for projection.
175#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
176#[serde(rename_all = "lowercase")]
177pub enum SessionState {
178    /// Session is active.
179    Active,
180    /// Session is paused.
181    Paused,
182    /// Session has ended.
183    Ended,
184}
185
186/// A message in session history.
187#[derive(Debug, Clone, Serialize, Deserialize)]
188pub enum SessionMessage {
189    /// Message from the peer.
190    Inbound(String),
191    /// Message to the peer.
192    Outbound(String),
193    /// Tool call.
194    Tool {
195        /// Name of the tool that was called.
196        name: String,
197        /// Result returned by the tool.
198        result: String,
199    },
200}
201
202/// CRDT projection for session state.
203///
204/// This is a materialized view derived from applying events.
205/// Supports CRDT merge for conflict resolution.
206#[derive(Debug, Clone, Serialize, Deserialize)]
207pub struct SessionProjection {
208    /// Session identifier.
209    pub session_key: SessionKey,
210    /// Agent ID.
211    pub agent_id: String,
212    /// Channel.
213    pub channel: ChannelId,
214    /// Peer ID.
215    pub peer_id: String,
216    /// Current state.
217    pub state: SessionState,
218    /// Total message count.
219    pub message_count: u64,
220    /// Last activity timestamp.
221    pub last_activity: DateTime<Utc>,
222    /// Message history.
223    pub messages: Vec<SessionMessage>,
224    /// Custom state key-value pairs.
225    #[serde(default)]
226    pub custom_state: std::collections::HashMap<String, serde_json::Value>,
227    /// Last event ID applied.
228    pub last_event_id: Option<EventId>,
229}
230
231impl SessionProjection {
232    /// Create a new empty projection.
233    #[must_use]
234    pub fn new(
235        session_key: SessionKey,
236        agent_id: String,
237        channel: ChannelId,
238        peer_id: String,
239    ) -> Self {
240        Self {
241            session_key,
242            agent_id,
243            channel,
244            peer_id,
245            state: SessionState::Active,
246            message_count: 0,
247            last_activity: Utc::now(),
248            messages: Vec::new(),
249            custom_state: std::collections::HashMap::new(),
250            last_event_id: None,
251        }
252    }
253
254    /// Apply an event to update the projection.
255    pub fn apply(&mut self, event: &SessionEvent) {
256        match &event.kind {
257            SessionEventKind::SessionStarted { .. } => {
258                self.state = SessionState::Active;
259            }
260            SessionEventKind::MessageReceived { content, .. } => {
261                self.messages.push(SessionMessage::Inbound(content.clone()));
262                self.message_count += 1;
263            }
264            SessionEventKind::MessageSent { content, .. } => {
265                self.messages
266                    .push(SessionMessage::Outbound(content.clone()));
267            }
268            SessionEventKind::ToolCalled { tool_name, .. } => {
269                // Tool calls are recorded but don't add to message history yet
270                tracing::debug!(tool = %tool_name, "Tool called");
271            }
272            SessionEventKind::ToolResult {
273                tool_name, result, ..
274            } => {
275                let result_str = serde_json::to_string(result).unwrap_or_default();
276                self.messages.push(SessionMessage::Tool {
277                    name: tool_name.clone(),
278                    result: result_str,
279                });
280            }
281            SessionEventKind::AgentResponse { content, .. } => {
282                self.messages
283                    .push(SessionMessage::Outbound(content.clone()));
284            }
285            SessionEventKind::SessionEnded { .. } => {
286                self.state = SessionState::Ended;
287            }
288            SessionEventKind::StateChanged { key, value } => {
289                self.custom_state.insert(key.clone(), value.clone());
290            }
291        }
292
293        self.last_activity = event.timestamp;
294        self.last_event_id = Some(event.id.clone());
295    }
296
297    /// CRDT merge with another projection.
298    ///
299    /// Uses last-write-wins for scalar fields, union for messages.
300    pub fn merge(&mut self, other: &Self) {
301        // Last-write-wins for activity timestamp
302        if other.last_activity > self.last_activity {
303            self.state = other.state;
304            self.last_activity = other.last_activity;
305            self.last_event_id = other.last_event_id.clone();
306        }
307
308        // Union of messages (deduplicate by content hash would be ideal)
309        // For now, take the longer history
310        if other.messages.len() > self.messages.len() {
311            self.messages = other.messages.clone();
312            self.message_count = other.message_count;
313        }
314
315        // Merge custom state (last-write-wins per key)
316        for (key, value) in &other.custom_state {
317            self.custom_state.insert(key.clone(), value.clone());
318        }
319    }
320}
321
322/// Event store backed by sled.
323pub struct EventStore {
324    db: sled::Db,
325    events_tree: sled::Tree,
326    sessions_tree: sled::Tree,
327}
328
329impl EventStore {
330    /// Open or create an event store.
331    ///
332    /// # Errors
333    ///
334    /// Returns error if database cannot be opened.
335    pub fn open(path: &Path) -> Result<Self, EventStoreError> {
336        let db = sled::open(path)?;
337        let events_tree = db.open_tree("events")?;
338        let sessions_tree = db.open_tree("sessions")?;
339
340        Ok(Self {
341            db,
342            events_tree,
343            sessions_tree,
344        })
345    }
346
347    /// Append an event to a session's event log.
348    ///
349    /// # Errors
350    ///
351    /// Returns error if storage fails.
352    pub fn append(&self, event: &SessionEvent) -> Result<EventId, EventStoreError> {
353        let event_key = format!("{}:{}", event.session_key, event.id.to_hex());
354        let event_data = serde_json::to_vec(event)?;
355
356        self.events_tree.insert(event_key.as_bytes(), event_data)?;
357
358        // Update session projection
359        self.update_projection(event)?;
360
361        Ok(event.id.clone())
362    }
363
364    /// Get all events for a session.
365    ///
366    /// # Errors
367    ///
368    /// Returns error if storage read fails.
369    pub fn get_events(
370        &self,
371        session_key: &SessionKey,
372    ) -> Result<Vec<SessionEvent>, EventStoreError> {
373        let prefix = format!("{session_key}:");
374        let mut events = Vec::new();
375
376        for result in self.events_tree.scan_prefix(prefix.as_bytes()) {
377            let (_, value) = result?;
378            let event: SessionEvent = serde_json::from_slice(&value)?;
379            events.push(event);
380        }
381
382        // Sort by timestamp
383        events.sort_by_key(|e| e.timestamp);
384        Ok(events)
385    }
386
387    /// Get events since a specific timestamp.
388    ///
389    /// # Errors
390    ///
391    /// Returns error if storage read fails.
392    pub fn get_events_since(
393        &self,
394        session_key: &SessionKey,
395        since: DateTime<Utc>,
396    ) -> Result<Vec<SessionEvent>, EventStoreError> {
397        let events = self.get_events(session_key)?;
398        Ok(events.into_iter().filter(|e| e.timestamp > since).collect())
399    }
400
401    /// Get the session projection.
402    ///
403    /// # Errors
404    ///
405    /// Returns error if storage read fails or projection not found.
406    pub fn get_projection(
407        &self,
408        session_key: &SessionKey,
409    ) -> Result<SessionProjection, EventStoreError> {
410        let key = session_key.as_ref().as_bytes();
411
412        match self.sessions_tree.get(key)? {
413            Some(data) => Ok(serde_json::from_slice(&data)?),
414            None => Err(EventStoreError::NotFound(session_key.to_string())),
415        }
416    }
417
418    /// List all session keys.
419    ///
420    /// # Errors
421    ///
422    /// Returns error if storage read fails.
423    pub fn list_sessions(&self) -> Result<Vec<SessionKey>, EventStoreError> {
424        let mut sessions = Vec::new();
425
426        for result in &self.sessions_tree {
427            let (key, _) = result?;
428            if let Ok(key_str) = std::str::from_utf8(&key) {
429                sessions.push(SessionKey::new(key_str));
430            }
431        }
432
433        Ok(sessions)
434    }
435
436    /// Update the session projection after appending an event.
437    fn update_projection(&self, event: &SessionEvent) -> Result<(), EventStoreError> {
438        let key = event.session_key.as_ref().as_bytes();
439
440        let mut projection = match self.sessions_tree.get(key)? {
441            Some(data) => serde_json::from_slice(&data)?,
442            None => {
443                // Create new projection from SessionStarted event
444                if let SessionEventKind::SessionStarted { channel, peer_id } = &event.kind {
445                    SessionProjection::new(
446                        event.session_key.clone(),
447                        event.agent_id.clone(),
448                        ChannelId::new(channel),
449                        peer_id.clone(),
450                    )
451                } else {
452                    // Create with defaults if no SessionStarted
453                    SessionProjection::new(
454                        event.session_key.clone(),
455                        event.agent_id.clone(),
456                        ChannelId::new("unknown"),
457                        "unknown".to_string(),
458                    )
459                }
460            }
461        };
462
463        projection.apply(event);
464
465        let projection_data = serde_json::to_vec(&projection)?;
466        self.sessions_tree.insert(key, projection_data)?;
467
468        Ok(())
469    }
470
471    /// Flush all pending writes to disk.
472    ///
473    /// # Errors
474    ///
475    /// Returns error if flush fails.
476    pub fn flush(&self) -> Result<(), EventStoreError> {
477        self.db.flush()?;
478        Ok(())
479    }
480}
481
482#[cfg(test)]
483mod tests {
484    use super::*;
485    use crate::types::AgentId;
486    use tempfile::tempdir;
487
488    #[test]
489    fn test_event_id_generation() {
490        let id1 = EventId::from_content(b"test content");
491        let id2 = EventId::from_content(b"test content");
492        let id3 = EventId::from_content(b"different content");
493
494        assert_eq!(id1, id2);
495        assert_ne!(id1, id3);
496    }
497
498    #[test]
499    fn test_session_event_creation() {
500        let session_key = SessionKey::new("test-session");
501        let event = SessionEvent::new(
502            session_key.clone(),
503            "agent1".to_string(),
504            SessionEventKind::MessageReceived {
505                content: "Hello".to_string(),
506                attachments: vec![],
507            },
508        );
509
510        assert_eq!(event.session_key, session_key);
511        assert_eq!(event.agent_id, "agent1");
512    }
513
514    #[test]
515    fn test_projection_apply() {
516        let mut projection = SessionProjection::new(
517            SessionKey::new("test"),
518            "agent".to_string(),
519            ChannelId::telegram(),
520            "user123".to_string(),
521        );
522
523        let event = SessionEvent::new(
524            SessionKey::new("test"),
525            "agent".to_string(),
526            SessionEventKind::MessageReceived {
527                content: "Hello".to_string(),
528                attachments: vec![],
529            },
530        );
531
532        projection.apply(&event);
533
534        assert_eq!(projection.message_count, 1);
535        assert_eq!(projection.messages.len(), 1);
536    }
537
538    #[test]
539    fn test_event_store_roundtrip() {
540        let temp = tempdir().unwrap();
541        let store = EventStore::open(temp.path()).unwrap();
542
543        let session_key = SessionKey::build(
544            &AgentId::default_agent(),
545            &ChannelId::telegram(),
546            "bot123",
547            crate::types::PeerType::Dm,
548            &crate::types::PeerId::new("user456"),
549        );
550
551        // Start session
552        let start_event = SessionEvent::new(
553            session_key.clone(),
554            "default".to_string(),
555            SessionEventKind::SessionStarted {
556                channel: "telegram".to_string(),
557                peer_id: "user456".to_string(),
558            },
559        );
560        store.append(&start_event).unwrap();
561
562        // Add message
563        let msg_event = SessionEvent::new(
564            session_key.clone(),
565            "default".to_string(),
566            SessionEventKind::MessageReceived {
567                content: "Hello, agent!".to_string(),
568                attachments: vec![],
569            },
570        );
571        store.append(&msg_event).unwrap();
572
573        // Verify events
574        let events = store.get_events(&session_key).unwrap();
575        assert_eq!(events.len(), 2);
576
577        // Verify projection
578        let projection = store.get_projection(&session_key).unwrap();
579        assert_eq!(projection.message_count, 1);
580        assert_eq!(projection.state, SessionState::Active);
581    }
582}