Skip to main content

cortexai_cloudflare/
kv.rs

1//! KV Store integration for conversation persistence
2
3use crate::error::{CloudflareError, Result};
4use cortexai_llm_client::Message;
5use serde::{Deserialize, Serialize};
6use worker::kv::KvStore as WorkerKvStore;
7
8/// Wrapper around Cloudflare KV for conversation storage
9pub struct KvStore {
10    kv: WorkerKvStore,
11    /// TTL for conversation entries in seconds (default: 24 hours)
12    ttl: u64,
13}
14
15impl KvStore {
16    /// Create a new KV store wrapper
17    pub fn new(kv: WorkerKvStore) -> Self {
18        Self {
19            kv,
20            ttl: 86400, // 24 hours default
21        }
22    }
23
24    /// Set custom TTL for conversation entries
25    pub fn with_ttl(mut self, ttl_seconds: u64) -> Self {
26        self.ttl = ttl_seconds;
27        self
28    }
29
30    /// Get conversation history for a session
31    pub async fn get_conversation(&self, session_id: &str) -> Result<Vec<Message>> {
32        let key = format!("conversation:{}", session_id);
33
34        match self.kv.get(&key).text().await {
35            Ok(Some(data)) => {
36                let conversation: Conversation = serde_json::from_str(&data)
37                    .map_err(|e| CloudflareError::KvError(e.to_string()))?;
38                Ok(conversation.messages)
39            }
40            Ok(None) => Ok(Vec::new()),
41            Err(e) => Err(CloudflareError::KvError(e.to_string())),
42        }
43    }
44
45    /// Save conversation history for a session
46    pub async fn save_conversation(&self, session_id: &str, messages: &[Message]) -> Result<()> {
47        let key = format!("conversation:{}", session_id);
48        let conversation = Conversation {
49            messages: messages.to_vec(),
50            updated_at: chrono_now(),
51        };
52
53        let data = serde_json::to_string(&conversation)
54            .map_err(|e| CloudflareError::KvError(e.to_string()))?;
55
56        self.kv
57            .put(&key, data)
58            .map_err(|e| CloudflareError::KvError(e.to_string()))?
59            .expiration_ttl(self.ttl)
60            .execute()
61            .await
62            .map_err(|e| CloudflareError::KvError(e.to_string()))?;
63
64        Ok(())
65    }
66
67    /// Append a message to conversation history
68    pub async fn append_message(&self, session_id: &str, message: Message) -> Result<()> {
69        let mut messages = self.get_conversation(session_id).await?;
70        messages.push(message);
71        self.save_conversation(session_id, &messages).await
72    }
73
74    /// Clear conversation history for a session
75    pub async fn clear_conversation(&self, session_id: &str) -> Result<()> {
76        let key = format!("conversation:{}", session_id);
77        self.kv
78            .delete(&key)
79            .await
80            .map_err(|e| CloudflareError::KvError(e.to_string()))?;
81        Ok(())
82    }
83
84    /// Get session metadata
85    pub async fn get_metadata(&self, session_id: &str) -> Result<Option<SessionMetadata>> {
86        let key = format!("metadata:{}", session_id);
87
88        match self.kv.get(&key).text().await {
89            Ok(Some(data)) => {
90                let metadata: SessionMetadata = serde_json::from_str(&data)
91                    .map_err(|e| CloudflareError::KvError(e.to_string()))?;
92                Ok(Some(metadata))
93            }
94            Ok(None) => Ok(None),
95            Err(e) => Err(CloudflareError::KvError(e.to_string())),
96        }
97    }
98
99    /// Save session metadata
100    pub async fn save_metadata(&self, session_id: &str, metadata: &SessionMetadata) -> Result<()> {
101        let key = format!("metadata:{}", session_id);
102        let data =
103            serde_json::to_string(metadata).map_err(|e| CloudflareError::KvError(e.to_string()))?;
104
105        self.kv
106            .put(&key, data)
107            .map_err(|e| CloudflareError::KvError(e.to_string()))?
108            .expiration_ttl(self.ttl)
109            .execute()
110            .await
111            .map_err(|e| CloudflareError::KvError(e.to_string()))?;
112
113        Ok(())
114    }
115}
116
117/// Stored conversation structure
118#[derive(Debug, Serialize, Deserialize)]
119struct Conversation {
120    messages: Vec<Message>,
121    updated_at: String,
122}
123
124/// Session metadata for tracking
125#[derive(Debug, Clone, Serialize, Deserialize)]
126pub struct SessionMetadata {
127    /// Session identifier
128    pub session_id: String,
129    /// Total tokens used in session
130    pub total_tokens: u32,
131    /// Number of messages exchanged
132    pub message_count: u32,
133    /// When the session was created
134    pub created_at: String,
135    /// When the session was last updated
136    pub updated_at: String,
137    /// Custom metadata
138    pub custom: Option<serde_json::Value>,
139}
140
141impl SessionMetadata {
142    /// Create new session metadata
143    pub fn new(session_id: impl Into<String>) -> Self {
144        let now = chrono_now();
145        Self {
146            session_id: session_id.into(),
147            total_tokens: 0,
148            message_count: 0,
149            created_at: now.clone(),
150            updated_at: now,
151            custom: None,
152        }
153    }
154
155    /// Update token count
156    pub fn add_tokens(&mut self, tokens: u32) {
157        self.total_tokens += tokens;
158        self.updated_at = chrono_now();
159    }
160
161    /// Increment message count
162    pub fn increment_messages(&mut self) {
163        self.message_count += 1;
164        self.updated_at = chrono_now();
165    }
166}
167
168/// Get current timestamp as ISO string (WASM-compatible)
169fn chrono_now() -> String {
170    // In WASM, we use js_sys for time
171    let now = js_sys::Date::new_0();
172    now.to_iso_string().as_string().unwrap_or_default()
173}