distri_types/
stores.rs

1use crate::{
2    ScratchpadEntry, ToolAuthStore, ToolResponse, configuration::PluginArtifact,
3    workflow::WorkflowStore,
4};
5use async_trait::async_trait;
6use serde::{Deserialize, Serialize, de::DeserializeOwned};
7use serde_json::Value;
8use std::{collections::HashMap, sync::Arc};
9use tokio::sync::oneshot;
10use uuid::Uuid;
11
12use crate::{
13    AgentEvent, CreateThreadRequest, Message, Task, TaskMessage, TaskStatus, Thread, ThreadSummary,
14    UpdateThreadRequest, browser::BrowserSessionRecord,
15};
16
17// Redis and PostgreSQL stores moved to distri-stores crate
18
19/// Initialized store collection
20#[derive(Clone)]
21pub struct InitializedStores {
22    pub session_store: Arc<dyn SessionStore>,
23    pub agent_store: Arc<dyn AgentStore>,
24    pub task_store: Arc<dyn TaskStore>,
25    pub thread_store: Arc<dyn ThreadStore>,
26    pub tool_auth_store: Arc<dyn ToolAuthStore>,
27    pub scratchpad_store: Arc<dyn ScratchpadStore>,
28    pub workflow_store: Arc<dyn WorkflowStore>,
29    pub memory_store: Option<Arc<dyn MemoryStore>>,
30    pub crawl_store: Option<Arc<dyn CrawlStore>>,
31    pub external_tool_calls_store: Arc<dyn ExternalToolCallsStore>,
32    pub plugin_store: Arc<dyn PluginCatalogStore>,
33    pub browser_session_store: Arc<dyn BrowserSessionStore>,
34}
35impl InitializedStores {
36    pub fn set_tool_auth_store(&mut self, tool_auth_store: Arc<dyn ToolAuthStore>) {
37        self.tool_auth_store = tool_auth_store;
38    }
39
40    pub fn set_external_tool_calls_store(mut self, store: Arc<dyn ExternalToolCallsStore>) {
41        self.external_tool_calls_store = store;
42    }
43
44    pub fn set_session_store(&mut self, session_store: Arc<dyn SessionStore>) {
45        self.session_store = session_store;
46    }
47
48    pub fn set_agent_store(&mut self, agent_store: Arc<dyn AgentStore>) {
49        self.agent_store = agent_store;
50    }
51
52    pub fn with_task_store(&mut self, task_store: Arc<dyn TaskStore>) {
53        self.task_store = task_store;
54    }
55
56    pub fn with_thread_store(&mut self, thread_store: Arc<dyn ThreadStore>) {
57        self.thread_store = thread_store;
58    }
59
60    pub fn with_scratchpad_store(&mut self, scratchpad_store: Arc<dyn ScratchpadStore>) {
61        self.scratchpad_store = scratchpad_store;
62    }
63
64    pub fn with_workflow_store(mut self, workflow_store: Arc<dyn WorkflowStore>) {
65        self.workflow_store = workflow_store;
66    }
67
68    pub fn with_plugin_store(&mut self, plugin_store: Arc<dyn PluginCatalogStore>) {
69        self.plugin_store = plugin_store;
70    }
71
72    pub fn set_browser_session_store(&mut self, store: Arc<dyn BrowserSessionStore>) {
73        self.browser_session_store = store;
74    }
75}
76
77impl std::fmt::Debug for InitializedStores {
78    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
79        f.debug_struct("InitializedStores").finish()
80    }
81}
82
83// SessionStore trait - manages current conversation thread/run
84#[async_trait::async_trait]
85pub trait SessionStore: Send + Sync + std::fmt::Debug {
86    async fn clear_session(&self, namespace: &str) -> anyhow::Result<()>;
87
88    async fn set_value(&self, namespace: &str, key: &str, value: &Value) -> anyhow::Result<()>;
89
90    async fn set_value_with_expiry(
91        &self,
92        namespace: &str,
93        key: &str,
94        value: &Value,
95        expiry: Option<chrono::DateTime<chrono::Utc>>,
96    ) -> anyhow::Result<()>;
97
98    async fn get_value(&self, namespace: &str, key: &str) -> anyhow::Result<Option<Value>>;
99
100    async fn delete_value(&self, namespace: &str, key: &str) -> anyhow::Result<()>;
101
102    async fn get_all_values(&self, namespace: &str) -> anyhow::Result<HashMap<String, Value>>;
103}
104#[async_trait::async_trait]
105pub trait SessionStoreExt: SessionStore {
106    async fn set<T: Serialize + Sync>(
107        &self,
108        namespace: &str,
109        key: &str,
110        value: &T,
111    ) -> anyhow::Result<()> {
112        self.set_value(namespace, key, &serde_json::to_value(value)?)
113            .await
114    }
115    async fn set_with_expiry<T: Serialize + Sync>(
116        &self,
117        namespace: &str,
118        key: &str,
119        value: &T,
120        expiry: Option<chrono::DateTime<chrono::Utc>>,
121    ) -> anyhow::Result<()> {
122        self.set_value_with_expiry(namespace, key, &serde_json::to_value(value)?, expiry)
123            .await
124    }
125    async fn get<T: DeserializeOwned + Sync>(
126        &self,
127        namespace: &str,
128        key: &str,
129    ) -> anyhow::Result<Option<T>> {
130        match self.get_value(namespace, key).await? {
131            Some(b) => Ok(Some(serde_json::from_value(b)?)),
132            None => Ok(None),
133        }
134    }
135}
136impl<T: SessionStore + ?Sized> SessionStoreExt for T {}
137
138#[async_trait::async_trait]
139pub trait BrowserSessionStore: Send + Sync + std::fmt::Debug {
140    async fn save_session(&self, record: BrowserSessionRecord) -> anyhow::Result<()>;
141
142    async fn get_session(&self, user_id: &str) -> anyhow::Result<Option<BrowserSessionRecord>>;
143
144    async fn delete_session(&self, user_id: &str) -> anyhow::Result<()>;
145}
146
147// Higher-level MemoryStore trait - manages cross-session permanent memory using user_id
148#[async_trait::async_trait]
149pub trait MemoryStore: Send + Sync {
150    /// Store permanent memory from a session for cross-session access
151    async fn store_memory(
152        &self,
153        user_id: &str,
154        session_memory: SessionMemory,
155    ) -> anyhow::Result<()>;
156
157    /// Search for relevant memories across sessions for a user
158    async fn search_memories(
159        &self,
160        user_id: &str,
161        query: &str,
162        limit: Option<usize>,
163    ) -> anyhow::Result<Vec<String>>;
164
165    /// Get all permanent memories for a user
166    async fn get_user_memories(&self, user_id: &str) -> anyhow::Result<Vec<String>>;
167
168    /// Clear all memories for a user
169    async fn clear_user_memories(&self, user_id: &str) -> anyhow::Result<()>;
170}
171
172#[derive(Debug, Clone)]
173pub struct SessionMemory {
174    pub agent_id: String,
175    pub thread_id: String,
176    pub session_summary: String,
177    pub key_insights: Vec<String>,
178    pub important_facts: Vec<String>,
179    pub timestamp: chrono::DateTime<chrono::Utc>,
180}
181#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
182#[serde(tag = "type", rename_all = "snake_case")]
183pub enum FilterMessageType {
184    Events,
185    Messages,
186    Artifacts,
187}
188
189#[derive(Debug, Clone, Serialize, Deserialize)]
190pub struct MessageFilter {
191    pub filter: Option<Vec<FilterMessageType>>,
192    pub limit: Option<usize>,
193    pub offset: Option<usize>,
194}
195
196// Task Store trait for A2A task management
197#[async_trait]
198pub trait TaskStore: Send + Sync {
199    fn init_task(
200        &self,
201        context_id: &str,
202        task_id: Option<&str>,
203        status: Option<TaskStatus>,
204    ) -> Task {
205        let task_id = task_id.unwrap_or(&Uuid::new_v4().to_string()).to_string();
206        Task {
207            id: task_id,
208            status: status.unwrap_or(TaskStatus::Pending),
209            created_at: chrono::Utc::now().timestamp_millis(),
210            updated_at: chrono::Utc::now().timestamp_millis(),
211            thread_id: context_id.to_string(),
212            parent_task_id: None,
213        }
214    }
215    async fn get_or_create_task(
216        &self,
217        thread_id: &str,
218        task_id: &str,
219    ) -> Result<(), anyhow::Error> {
220        match self.get_task(&task_id).await? {
221            Some(task) => task,
222            None => {
223                self.create_task(&thread_id, Some(&task_id), Some(TaskStatus::Running))
224                    .await?
225            }
226        };
227
228        Ok(())
229    }
230    async fn create_task(
231        &self,
232        context_id: &str,
233        task_id: Option<&str>,
234        task_status: Option<TaskStatus>,
235    ) -> anyhow::Result<Task>;
236    async fn get_task(&self, task_id: &str) -> anyhow::Result<Option<Task>>;
237    async fn update_task_status(&self, task_id: &str, status: TaskStatus) -> anyhow::Result<()>;
238    async fn add_event_to_task(&self, task_id: &str, event: AgentEvent) -> anyhow::Result<()>;
239    async fn add_message_to_task(&self, task_id: &str, message: &Message) -> anyhow::Result<()>;
240    async fn cancel_task(&self, task_id: &str) -> anyhow::Result<Task>;
241    async fn list_tasks(&self, thread_id: Option<&str>) -> anyhow::Result<Vec<Task>>;
242
243    async fn get_history(
244        &self,
245        thread_id: &str,
246        filter: Option<MessageFilter>,
247    ) -> anyhow::Result<Vec<(Task, Vec<TaskMessage>)>>;
248
249    async fn update_parent_task(
250        &self,
251        task_id: &str,
252        parent_task_id: Option<&str>,
253    ) -> anyhow::Result<()>;
254}
255
256#[derive(Debug, Clone)]
257pub struct PluginMetadataRecord {
258    pub package_name: String,
259    pub version: Option<String>,
260    pub object_prefix: String,
261    pub entrypoint: Option<String>,
262    pub artifact: PluginArtifact,
263    pub updated_at: chrono::DateTime<chrono::Utc>,
264}
265
266#[async_trait]
267pub trait PluginCatalogStore: Send + Sync {
268    async fn list_plugins(&self) -> anyhow::Result<Vec<PluginMetadataRecord>>;
269
270    async fn get_plugin(&self, package_name: &str) -> anyhow::Result<Option<PluginMetadataRecord>>;
271
272    async fn upsert_plugin(&self, record: &PluginMetadataRecord) -> anyhow::Result<()>;
273
274    async fn remove_plugin(&self, package_name: &str) -> anyhow::Result<()>;
275
276    async fn clear(&self) -> anyhow::Result<()>;
277}
278
279// Thread Store trait for thread management
280#[async_trait]
281pub trait ThreadStore: Send + Sync {
282    fn as_any(&self) -> &dyn std::any::Any;
283    async fn create_thread(&self, request: CreateThreadRequest) -> anyhow::Result<Thread>;
284    async fn get_thread(&self, thread_id: &str) -> anyhow::Result<Option<Thread>>;
285    async fn update_thread(
286        &self,
287        thread_id: &str,
288        request: UpdateThreadRequest,
289    ) -> anyhow::Result<Thread>;
290    async fn delete_thread(&self, thread_id: &str) -> anyhow::Result<()>;
291    async fn list_threads(
292        &self,
293        agent_id: Option<&str>,
294        limit: Option<u32>,
295        offset: Option<u32>,
296        attributes_filter: Option<&serde_json::Value>,
297    ) -> anyhow::Result<Vec<ThreadSummary>>;
298    async fn update_thread_with_message(
299        &self,
300        thread_id: &str,
301        message: &str,
302    ) -> anyhow::Result<()>;
303}
304
305#[async_trait]
306pub trait AgentStore: Send + Sync {
307    async fn list(
308        &self,
309        cursor: Option<String>,
310        limit: Option<usize>,
311    ) -> (Vec<crate::configuration::AgentConfig>, Option<String>);
312
313    async fn get(&self, name: &str) -> Option<crate::configuration::AgentConfig>;
314    async fn register(&self, config: crate::configuration::AgentConfig) -> anyhow::Result<()>;
315    /// Update an existing agent with new definition
316    async fn update(&self, config: crate::configuration::AgentConfig) -> anyhow::Result<()>;
317
318    async fn clear(&self) -> anyhow::Result<()>;
319}
320
321/// Store for managing scratchpad entries across conversations
322#[async_trait::async_trait]
323pub trait ScratchpadStore: Send + Sync + std::fmt::Debug {
324    /// Add a scratchpad entry for a specific thread
325    async fn add_entry(
326        &self,
327        thread_id: &str,
328        entry: ScratchpadEntry,
329    ) -> Result<(), crate::AgentError>;
330
331    /// Clear all scratchpad entries for a thread
332    async fn clear_entries(&self, thread_id: &str) -> Result<(), crate::AgentError>;
333
334    /// Get entries for a specific task within a thread
335    async fn get_entries(
336        &self,
337        thread_id: &str,
338        task_id: &str,
339        limit: Option<usize>,
340    ) -> Result<Vec<ScratchpadEntry>, crate::AgentError>;
341
342    async fn get_all_entries(
343        &self,
344        thread_id: &str,
345        limit: Option<usize>,
346    ) -> Result<Vec<ScratchpadEntry>, crate::AgentError>;
347}
348
349/// Web crawl result data
350#[derive(Debug, Clone, Serialize, Deserialize)]
351pub struct CrawlResult {
352    pub id: String,
353    pub url: String,
354    pub title: Option<String>,
355    pub content: String,
356    pub html: Option<String>,
357    pub metadata: serde_json::Value,
358    pub links: Vec<String>,
359    pub images: Vec<String>,
360    pub status_code: Option<u16>,
361    pub crawled_at: chrono::DateTime<chrono::Utc>,
362    pub processing_time_ms: Option<u64>,
363}
364
365/// Store for managing web crawl results
366#[async_trait]
367pub trait CrawlStore: Send + Sync {
368    /// Store a crawl result
369    async fn store_crawl_result(&self, result: CrawlResult) -> anyhow::Result<String>;
370
371    /// Get a crawl result by ID
372    async fn get_crawl_result(&self, id: &str) -> anyhow::Result<Option<CrawlResult>>;
373
374    /// Get crawl results for a specific URL
375    async fn get_crawl_results_by_url(&self, url: &str) -> anyhow::Result<Vec<CrawlResult>>;
376
377    /// Get recent crawl results (within time limit)
378    async fn get_recent_crawl_results(
379        &self,
380        limit: Option<usize>,
381        since: Option<chrono::DateTime<chrono::Utc>>,
382    ) -> anyhow::Result<Vec<CrawlResult>>;
383
384    /// Check if URL was recently crawled (within cache duration)
385    async fn is_url_recently_crawled(
386        &self,
387        url: &str,
388        cache_duration: chrono::Duration,
389    ) -> anyhow::Result<Option<CrawlResult>>;
390
391    /// Delete crawl result
392    async fn delete_crawl_result(&self, id: &str) -> anyhow::Result<()>;
393
394    /// Clear all crawl results older than specified date
395    async fn cleanup_old_results(
396        &self,
397        before: chrono::DateTime<chrono::Utc>,
398    ) -> anyhow::Result<usize>;
399}
400
401/// Store for managing external tool call completions using oneshot channels
402#[async_trait]
403pub trait ExternalToolCallsStore: Send + Sync + std::fmt::Debug {
404    /// Register a new external tool call session and return a receiver for the response
405    async fn register_external_tool_call(
406        &self,
407        session_id: &str,
408    ) -> anyhow::Result<oneshot::Receiver<ToolResponse>>;
409
410    /// Complete an external tool call by sending the response
411    async fn complete_external_tool_call(
412        &self,
413        session_id: &str,
414        tool_response: ToolResponse,
415    ) -> anyhow::Result<()>;
416
417    /// Remove a session (cleanup)
418    async fn remove_tool_call(&self, session_id: &str) -> anyhow::Result<()>;
419
420    /// List all pending sessions (for debugging)
421    async fn list_pending_tool_calls(&self) -> anyhow::Result<Vec<String>>;
422}