Skip to main content

openclaw_node/
events.rs

1//! Event store bindings for session event storage.
2
3use napi::bindgen_prelude::*;
4use napi_derive::napi;
5use std::path::PathBuf;
6use std::sync::Arc;
7
8use openclaw_core::events::{EventStore, SessionEvent, SessionEventKind};
9use openclaw_core::types::{SessionKey, TokenUsage};
10
11use crate::error::OpenClawError;
12
13/// `OpenClaw` event store wrapper for Node.js.
14///
15/// Provides append-only event storage for session events with
16/// CRDT projections for conflict-free state.
17#[napi]
18pub struct NodeEventStore {
19    store: Arc<EventStore>,
20}
21
22#[napi]
23impl NodeEventStore {
24    /// Open or create an event store at the given path.
25    #[napi(constructor)]
26    pub fn new(path: String) -> Result<Self> {
27        let store = EventStore::open(&PathBuf::from(&path))
28            .map_err(|e| OpenClawError::event_store_error(format!("EventStore open error: {e}")))?;
29
30        Ok(Self {
31            store: Arc::new(store),
32        })
33    }
34
35    /// Append a session event and return the event ID.
36    ///
37    /// # Arguments
38    ///
39    /// * `session_key` - The session key
40    /// * `agent_id` - The agent ID
41    /// * `event_type` - Event type: "`session_started`", "`message_received`", "`message_sent`",
42    ///                  "`agent_response`", "`session_ended`", "`state_changed`", "`tool_called`", "`tool_result`"
43    /// * `data` - JSON data for the event
44    #[napi]
45    pub fn append_event(
46        &self,
47        session_key: String,
48        agent_id: String,
49        event_type: String,
50        data: String,
51    ) -> Result<String> {
52        let data: serde_json::Value = serde_json::from_str(&data)
53            .map_err(|e| OpenClawError::event_store_error(format!("Invalid JSON: {e}")))?;
54
55        let kind = parse_event_kind(&event_type, &data)?;
56
57        let event = SessionEvent::new(SessionKey::new(&session_key), agent_id, kind);
58
59        let event_id = self
60            .store
61            .append(&event)
62            .map_err(|e| OpenClawError::event_store_error(format!("Append error: {e}")))?;
63
64        Ok(event_id.to_hex())
65    }
66
67    /// Get all events for a session as JSON.
68    #[napi]
69    pub fn get_events(&self, session_key: String) -> Result<String> {
70        let events = self
71            .store
72            .get_events(&SessionKey::new(&session_key))
73            .map_err(|e| OpenClawError::event_store_error(format!("Query error: {e}")))?;
74
75        serde_json::to_string(&events).map_err(|e| {
76            OpenClawError::event_store_error(format!("Serialization error: {e}")).into()
77        })
78    }
79
80    /// Get the session projection as JSON.
81    ///
82    /// The projection is a materialized view of the session state
83    /// derived from the event stream.
84    #[napi]
85    pub fn get_projection(&self, session_key: String) -> Result<String> {
86        let projection = self
87            .store
88            .get_projection(&SessionKey::new(&session_key))
89            .map_err(|e| OpenClawError::event_store_error(format!("Projection error: {e}")))?;
90
91        serde_json::to_string(&projection).map_err(|e| {
92            OpenClawError::event_store_error(format!("Serialization error: {e}")).into()
93        })
94    }
95
96    /// List all session keys.
97    #[napi]
98    pub fn list_sessions(&self) -> Result<Vec<String>> {
99        let sessions = self
100            .store
101            .list_sessions()
102            .map_err(|e| OpenClawError::event_store_error(format!("List error: {e}")))?;
103
104        Ok(sessions
105            .into_iter()
106            .map(|s| s.as_ref().to_string())
107            .collect())
108    }
109
110    /// Flush pending writes to disk.
111    #[napi]
112    pub fn flush(&self) -> Result<()> {
113        self.store
114            .flush()
115            .map_err(|e| OpenClawError::event_store_error(format!("Flush error: {e}")).into())
116    }
117}
118
119/// Parse event type string into `SessionEventKind`.
120fn parse_event_kind(event_type: &str, data: &serde_json::Value) -> Result<SessionEventKind> {
121    match event_type {
122        "session_started" => Ok(SessionEventKind::SessionStarted {
123            channel: data["channel"].as_str().unwrap_or("unknown").to_string(),
124            peer_id: data["peer_id"].as_str().unwrap_or("unknown").to_string(),
125        }),
126        "message_received" => Ok(SessionEventKind::MessageReceived {
127            content: data["content"].as_str().unwrap_or("").to_string(),
128            attachments: vec![],
129        }),
130        "message_sent" => Ok(SessionEventKind::MessageSent {
131            content: data["content"].as_str().unwrap_or("").to_string(),
132            message_id: data["message_id"].as_str().unwrap_or("").to_string(),
133        }),
134        "agent_response" => Ok(SessionEventKind::AgentResponse {
135            content: data["content"].as_str().unwrap_or("").to_string(),
136            model: data["model"].as_str().unwrap_or("").to_string(),
137            tokens: TokenUsage {
138                input_tokens: data["tokens"]["input_tokens"].as_u64().unwrap_or(0),
139                output_tokens: data["tokens"]["output_tokens"].as_u64().unwrap_or(0),
140                cache_read_tokens: data["tokens"]["cache_read_tokens"].as_u64(),
141                cache_write_tokens: data["tokens"]["cache_write_tokens"].as_u64(),
142            },
143        }),
144        "session_ended" => Ok(SessionEventKind::SessionEnded {
145            reason: data["reason"].as_str().unwrap_or("unknown").to_string(),
146        }),
147        "state_changed" => Ok(SessionEventKind::StateChanged {
148            key: data["key"].as_str().unwrap_or("").to_string(),
149            value: data.get("value").cloned().unwrap_or_default(),
150        }),
151        "tool_called" => Ok(SessionEventKind::ToolCalled {
152            tool_name: data["tool_name"].as_str().unwrap_or("").to_string(),
153            params: data.get("params").cloned().unwrap_or_default(),
154        }),
155        "tool_result" => Ok(SessionEventKind::ToolResult {
156            tool_name: data["tool_name"].as_str().unwrap_or("").to_string(),
157            result: data.get("result").cloned().unwrap_or_default(),
158            success: data["success"].as_bool().unwrap_or(true),
159        }),
160        _ => Err(
161            OpenClawError::event_store_error(format!("Unknown event type: {event_type}")).into(),
162        ),
163    }
164}