distri_types/
stores.rs

1use crate::{
2    ScratchpadEntry, ToolAuthStore, ToolResponse, configuration::PluginArtifact,
3    workflow::WorkflowStore,
4};
5use async_trait::async_trait;
6use chrono::{DateTime, Utc};
7use serde::{Deserialize, Serialize, de::DeserializeOwned};
8use serde_json::Value;
9use std::{collections::HashMap, sync::Arc};
10use tokio::sync::oneshot;
11use uuid::Uuid;
12
13use crate::{
14    AgentEvent, CreateThreadRequest, Message, Task, TaskMessage, TaskStatus, Thread, ThreadSummary,
15    UpdateThreadRequest, browser::BrowserSessionRecord,
16};
17
18// Redis and PostgreSQL stores moved to distri-stores crate
19
20/// Filter for listing threads
21#[derive(Debug, Clone, Default, Serialize, Deserialize)]
22pub struct ThreadListFilter {
23    /// Filter by agent ID
24    pub agent_id: Option<String>,
25    /// Filter by external ID (for integration with external systems)
26    pub external_id: Option<String>,
27    /// Filter by thread attributes (JSON matching)
28    #[serde(skip_serializing_if = "Option::is_none")]
29    pub attributes: Option<serde_json::Value>,
30}
31
32/// Initialized store collection
33#[derive(Clone)]
34pub struct InitializedStores {
35    pub session_store: Arc<dyn SessionStore>,
36    pub agent_store: Arc<dyn AgentStore>,
37    pub task_store: Arc<dyn TaskStore>,
38    pub thread_store: Arc<dyn ThreadStore>,
39    pub tool_auth_store: Arc<dyn ToolAuthStore>,
40    pub scratchpad_store: Arc<dyn ScratchpadStore>,
41    pub workflow_store: Arc<dyn WorkflowStore>,
42    pub memory_store: Option<Arc<dyn MemoryStore>>,
43    pub crawl_store: Option<Arc<dyn CrawlStore>>,
44    pub external_tool_calls_store: Arc<dyn ExternalToolCallsStore>,
45    pub plugin_store: Arc<dyn PluginCatalogStore>,
46    pub browser_session_store: Arc<dyn BrowserSessionStore>,
47    pub prompt_template_store: Option<Arc<dyn PromptTemplateStore>>,
48    pub secret_store: Option<Arc<dyn SecretStore>>,
49}
50impl InitializedStores {
51    pub fn set_tool_auth_store(&mut self, tool_auth_store: Arc<dyn ToolAuthStore>) {
52        self.tool_auth_store = tool_auth_store;
53    }
54
55    pub fn set_external_tool_calls_store(mut self, store: Arc<dyn ExternalToolCallsStore>) {
56        self.external_tool_calls_store = store;
57    }
58
59    pub fn set_session_store(&mut self, session_store: Arc<dyn SessionStore>) {
60        self.session_store = session_store;
61    }
62
63    pub fn set_agent_store(&mut self, agent_store: Arc<dyn AgentStore>) {
64        self.agent_store = agent_store;
65    }
66
67    pub fn with_task_store(&mut self, task_store: Arc<dyn TaskStore>) {
68        self.task_store = task_store;
69    }
70
71    pub fn with_thread_store(&mut self, thread_store: Arc<dyn ThreadStore>) {
72        self.thread_store = thread_store;
73    }
74
75    pub fn with_scratchpad_store(&mut self, scratchpad_store: Arc<dyn ScratchpadStore>) {
76        self.scratchpad_store = scratchpad_store;
77    }
78
79    pub fn with_workflow_store(mut self, workflow_store: Arc<dyn WorkflowStore>) {
80        self.workflow_store = workflow_store;
81    }
82
83    pub fn with_plugin_store(&mut self, plugin_store: Arc<dyn PluginCatalogStore>) {
84        self.plugin_store = plugin_store;
85    }
86
87    pub fn set_browser_session_store(&mut self, store: Arc<dyn BrowserSessionStore>) {
88        self.browser_session_store = store;
89    }
90}
91
92impl std::fmt::Debug for InitializedStores {
93    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
94        f.debug_struct("InitializedStores").finish()
95    }
96}
97
98#[derive(Debug, Serialize, Deserialize, Clone)]
99pub struct SessionSummary {
100    pub session_id: String,
101    pub keys: Vec<String>,
102    pub key_count: usize,
103    pub updated_at: Option<DateTime<Utc>>,
104}
105
106// SessionStore trait - manages current conversation thread/run
107#[async_trait::async_trait]
108pub trait SessionStore: Send + Sync + std::fmt::Debug {
109    async fn clear_session(&self, namespace: &str) -> anyhow::Result<()>;
110
111    async fn set_value(&self, namespace: &str, key: &str, value: &Value) -> anyhow::Result<()>;
112
113    async fn set_value_with_expiry(
114        &self,
115        namespace: &str,
116        key: &str,
117        value: &Value,
118        expiry: Option<chrono::DateTime<chrono::Utc>>,
119    ) -> anyhow::Result<()>;
120
121    async fn get_value(&self, namespace: &str, key: &str) -> anyhow::Result<Option<Value>>;
122
123    async fn delete_value(&self, namespace: &str, key: &str) -> anyhow::Result<()>;
124
125    async fn get_all_values(&self, namespace: &str) -> anyhow::Result<HashMap<String, Value>>;
126
127    async fn list_sessions(
128        &self,
129        namespace: Option<&str>,
130        limit: Option<usize>,
131        offset: Option<usize>,
132    ) -> anyhow::Result<Vec<SessionSummary>>;
133}
134#[async_trait::async_trait]
135pub trait SessionStoreExt: SessionStore {
136    async fn set<T: Serialize + Sync>(
137        &self,
138        namespace: &str,
139        key: &str,
140        value: &T,
141    ) -> anyhow::Result<()> {
142        self.set_value(namespace, key, &serde_json::to_value(value)?)
143            .await
144    }
145    async fn set_with_expiry<T: Serialize + Sync>(
146        &self,
147        namespace: &str,
148        key: &str,
149        value: &T,
150        expiry: Option<chrono::DateTime<chrono::Utc>>,
151    ) -> anyhow::Result<()> {
152        self.set_value_with_expiry(namespace, key, &serde_json::to_value(value)?, expiry)
153            .await
154    }
155    async fn get<T: DeserializeOwned + Sync>(
156        &self,
157        namespace: &str,
158        key: &str,
159    ) -> anyhow::Result<Option<T>> {
160        match self.get_value(namespace, key).await? {
161            Some(b) => Ok(Some(serde_json::from_value(b)?)),
162            None => Ok(None),
163        }
164    }
165}
166impl<T: SessionStore + ?Sized> SessionStoreExt for T {}
167
168#[async_trait::async_trait]
169pub trait BrowserSessionStore: Send + Sync + std::fmt::Debug {
170    async fn save_session(&self, record: BrowserSessionRecord) -> anyhow::Result<()>;
171
172    async fn get_session(&self, user_id: &str) -> anyhow::Result<Option<BrowserSessionRecord>>;
173
174    async fn delete_session(&self, user_id: &str) -> anyhow::Result<()>;
175}
176
177// Higher-level MemoryStore trait - manages cross-session permanent memory using user_id
178#[async_trait::async_trait]
179pub trait MemoryStore: Send + Sync {
180    /// Store permanent memory from a session for cross-session access
181    async fn store_memory(
182        &self,
183        user_id: &str,
184        session_memory: SessionMemory,
185    ) -> anyhow::Result<()>;
186
187    /// Search for relevant memories across sessions for a user
188    async fn search_memories(
189        &self,
190        user_id: &str,
191        query: &str,
192        limit: Option<usize>,
193    ) -> anyhow::Result<Vec<String>>;
194
195    /// Get all permanent memories for a user
196    async fn get_user_memories(&self, user_id: &str) -> anyhow::Result<Vec<String>>;
197
198    /// Clear all memories for a user
199    async fn clear_user_memories(&self, user_id: &str) -> anyhow::Result<()>;
200}
201
202#[derive(Debug, Clone)]
203pub struct SessionMemory {
204    pub agent_id: String,
205    pub thread_id: String,
206    pub session_summary: String,
207    pub key_insights: Vec<String>,
208    pub important_facts: Vec<String>,
209    pub timestamp: chrono::DateTime<chrono::Utc>,
210}
211#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
212#[serde(tag = "type", rename_all = "snake_case")]
213pub enum FilterMessageType {
214    Events,
215    Messages,
216    Artifacts,
217}
218
219#[derive(Debug, Clone, Serialize, Deserialize)]
220pub struct MessageFilter {
221    pub filter: Option<Vec<FilterMessageType>>,
222    pub limit: Option<usize>,
223    pub offset: Option<usize>,
224}
225
226// Task Store trait for A2A task management
227#[async_trait]
228pub trait TaskStore: Send + Sync {
229    fn init_task(
230        &self,
231        context_id: &str,
232        task_id: Option<&str>,
233        status: Option<TaskStatus>,
234    ) -> Task {
235        let task_id = task_id.unwrap_or(&Uuid::new_v4().to_string()).to_string();
236        Task {
237            id: task_id,
238            status: status.unwrap_or(TaskStatus::Pending),
239            created_at: chrono::Utc::now().timestamp_millis(),
240            updated_at: chrono::Utc::now().timestamp_millis(),
241            thread_id: context_id.to_string(),
242            parent_task_id: None,
243        }
244    }
245    async fn get_or_create_task(
246        &self,
247        thread_id: &str,
248        task_id: &str,
249    ) -> Result<(), anyhow::Error> {
250        match self.get_task(&task_id).await? {
251            Some(task) => task,
252            None => {
253                self.create_task(&thread_id, Some(&task_id), Some(TaskStatus::Running))
254                    .await?
255            }
256        };
257
258        Ok(())
259    }
260    async fn create_task(
261        &self,
262        context_id: &str,
263        task_id: Option<&str>,
264        task_status: Option<TaskStatus>,
265    ) -> anyhow::Result<Task>;
266    async fn get_task(&self, task_id: &str) -> anyhow::Result<Option<Task>>;
267    async fn update_task_status(&self, task_id: &str, status: TaskStatus) -> anyhow::Result<()>;
268    async fn add_event_to_task(&self, task_id: &str, event: AgentEvent) -> anyhow::Result<()>;
269    async fn add_message_to_task(&self, task_id: &str, message: &Message) -> anyhow::Result<()>;
270    async fn cancel_task(&self, task_id: &str) -> anyhow::Result<Task>;
271    async fn list_tasks(&self, thread_id: Option<&str>) -> anyhow::Result<Vec<Task>>;
272
273    async fn get_history(
274        &self,
275        thread_id: &str,
276        filter: Option<MessageFilter>,
277    ) -> anyhow::Result<Vec<(Task, Vec<TaskMessage>)>>;
278
279    async fn update_parent_task(
280        &self,
281        task_id: &str,
282        parent_task_id: Option<&str>,
283    ) -> anyhow::Result<()>;
284}
285
286#[derive(Debug, Clone)]
287pub struct PluginMetadataRecord {
288    pub package_name: String,
289    pub version: Option<String>,
290    pub object_prefix: String,
291    pub entrypoint: Option<String>,
292    pub artifact: PluginArtifact,
293    pub updated_at: chrono::DateTime<chrono::Utc>,
294}
295
296#[async_trait]
297pub trait PluginCatalogStore: Send + Sync {
298    async fn list_plugins(&self) -> anyhow::Result<Vec<PluginMetadataRecord>>;
299
300    async fn get_plugin(&self, package_name: &str) -> anyhow::Result<Option<PluginMetadataRecord>>;
301
302    async fn upsert_plugin(&self, record: &PluginMetadataRecord) -> anyhow::Result<()>;
303
304    async fn remove_plugin(&self, package_name: &str) -> anyhow::Result<()>;
305
306    async fn clear(&self) -> anyhow::Result<()>;
307}
308
309// Thread Store trait for thread management
310#[async_trait]
311pub trait ThreadStore: Send + Sync {
312    fn as_any(&self) -> &dyn std::any::Any;
313    async fn create_thread(&self, request: CreateThreadRequest) -> anyhow::Result<Thread>;
314    async fn get_thread(&self, thread_id: &str) -> anyhow::Result<Option<Thread>>;
315    async fn update_thread(
316        &self,
317        thread_id: &str,
318        request: UpdateThreadRequest,
319    ) -> anyhow::Result<Thread>;
320    async fn delete_thread(&self, thread_id: &str) -> anyhow::Result<()>;
321
322    async fn list_threads(
323        &self,
324        filter: &ThreadListFilter,
325        limit: Option<u32>,
326        offset: Option<u32>,
327    ) -> anyhow::Result<Vec<ThreadSummary>>;
328    async fn update_thread_with_message(
329        &self,
330        thread_id: &str,
331        message: &str,
332    ) -> anyhow::Result<()>;
333
334    /// Get aggregated home statistics
335    async fn get_home_stats(&self) -> anyhow::Result<HomeStats>;
336}
337
338/// Home statistics for dashboard
339#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
340pub struct HomeStats {
341    pub total_agents: i64,
342    pub total_threads: i64,
343    pub total_messages: i64,
344    pub avg_run_time_ms: Option<f64>,
345    // Cloud-specific fields (optional)
346    #[serde(skip_serializing_if = "Option::is_none")]
347    pub total_owned_agents: Option<i64>,
348    #[serde(skip_serializing_if = "Option::is_none")]
349    pub total_accessible_agents: Option<i64>,
350    #[serde(skip_serializing_if = "Option::is_none")]
351    pub most_active_agent: Option<MostActiveAgent>,
352    #[serde(skip_serializing_if = "Option::is_none")]
353    pub latest_threads: Option<Vec<LatestThreadInfo>>,
354}
355
356#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
357pub struct MostActiveAgent {
358    pub id: String,
359    pub name: String,
360    pub thread_count: i64,
361}
362
363#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
364pub struct LatestThreadInfo {
365    pub id: String,
366    pub title: String,
367    pub agent_id: String,
368    pub agent_name: String,
369    pub updated_at: chrono::DateTime<chrono::Utc>,
370}
371
372#[async_trait]
373pub trait AgentStore: Send + Sync {
374    async fn list(
375        &self,
376        cursor: Option<String>,
377        limit: Option<usize>,
378    ) -> (Vec<crate::configuration::AgentConfig>, Option<String>);
379
380    async fn get(&self, name: &str) -> Option<crate::configuration::AgentConfig>;
381    async fn register(&self, config: crate::configuration::AgentConfig) -> anyhow::Result<()>;
382    /// Update an existing agent with new definition
383    async fn update(&self, config: crate::configuration::AgentConfig) -> anyhow::Result<()>;
384
385    async fn clear(&self) -> anyhow::Result<()>;
386}
387
388/// Store for managing scratchpad entries across conversations
389#[async_trait::async_trait]
390pub trait ScratchpadStore: Send + Sync + std::fmt::Debug {
391    /// Add a scratchpad entry for a specific thread
392    async fn add_entry(
393        &self,
394        thread_id: &str,
395        entry: ScratchpadEntry,
396    ) -> Result<(), crate::AgentError>;
397
398    /// Clear all scratchpad entries for a thread
399    async fn clear_entries(&self, thread_id: &str) -> Result<(), crate::AgentError>;
400
401    /// Get entries for a specific task within a thread
402    async fn get_entries(
403        &self,
404        thread_id: &str,
405        task_id: &str,
406        limit: Option<usize>,
407    ) -> Result<Vec<ScratchpadEntry>, crate::AgentError>;
408
409    async fn get_all_entries(
410        &self,
411        thread_id: &str,
412        limit: Option<usize>,
413    ) -> Result<Vec<ScratchpadEntry>, crate::AgentError>;
414}
415
416/// Web crawl result data
417#[derive(Debug, Clone, Serialize, Deserialize)]
418pub struct CrawlResult {
419    pub id: String,
420    pub url: String,
421    pub title: Option<String>,
422    pub content: String,
423    pub html: Option<String>,
424    pub metadata: serde_json::Value,
425    pub links: Vec<String>,
426    pub images: Vec<String>,
427    pub status_code: Option<u16>,
428    pub crawled_at: chrono::DateTime<chrono::Utc>,
429    pub processing_time_ms: Option<u64>,
430}
431
432/// Store for managing web crawl results
433#[async_trait]
434pub trait CrawlStore: Send + Sync {
435    /// Store a crawl result
436    async fn store_crawl_result(&self, result: CrawlResult) -> anyhow::Result<String>;
437
438    /// Get a crawl result by ID
439    async fn get_crawl_result(&self, id: &str) -> anyhow::Result<Option<CrawlResult>>;
440
441    /// Get crawl results for a specific URL
442    async fn get_crawl_results_by_url(&self, url: &str) -> anyhow::Result<Vec<CrawlResult>>;
443
444    /// Get recent crawl results (within time limit)
445    async fn get_recent_crawl_results(
446        &self,
447        limit: Option<usize>,
448        since: Option<chrono::DateTime<chrono::Utc>>,
449    ) -> anyhow::Result<Vec<CrawlResult>>;
450
451    /// Check if URL was recently crawled (within cache duration)
452    async fn is_url_recently_crawled(
453        &self,
454        url: &str,
455        cache_duration: chrono::Duration,
456    ) -> anyhow::Result<Option<CrawlResult>>;
457
458    /// Delete crawl result
459    async fn delete_crawl_result(&self, id: &str) -> anyhow::Result<()>;
460
461    /// Clear all crawl results older than specified date
462    async fn cleanup_old_results(
463        &self,
464        before: chrono::DateTime<chrono::Utc>,
465    ) -> anyhow::Result<usize>;
466}
467
468/// Store for managing external tool call completions using oneshot channels
469#[async_trait]
470pub trait ExternalToolCallsStore: Send + Sync + std::fmt::Debug {
471    /// Register a new external tool call session and return a receiver for the response
472    async fn register_external_tool_call(
473        &self,
474        session_id: &str,
475    ) -> anyhow::Result<oneshot::Receiver<ToolResponse>>;
476
477    /// Complete an external tool call by sending the response
478    async fn complete_external_tool_call(
479        &self,
480        session_id: &str,
481        tool_response: ToolResponse,
482    ) -> anyhow::Result<()>;
483
484    /// Remove a session (cleanup)
485    async fn remove_tool_call(&self, session_id: &str) -> anyhow::Result<()>;
486
487    /// List all pending sessions (for debugging)
488    async fn list_pending_tool_calls(&self) -> anyhow::Result<Vec<String>>;
489}
490
491// ========== Prompt Template Store ==========
492
493#[derive(Debug, Clone, Serialize, Deserialize)]
494pub struct PromptTemplateRecord {
495    pub id: String,
496    pub name: String,
497    pub template: String,
498    pub description: Option<String>,
499    pub version: Option<String>,
500    pub is_system: bool,
501    pub created_at: chrono::DateTime<chrono::Utc>,
502    pub updated_at: chrono::DateTime<chrono::Utc>,
503}
504
505#[derive(Debug, Clone, Serialize, Deserialize)]
506pub struct NewPromptTemplate {
507    pub name: String,
508    pub template: String,
509    pub description: Option<String>,
510    pub version: Option<String>,
511    pub is_system: bool,
512}
513
514#[derive(Debug, Clone, Serialize, Deserialize)]
515pub struct UpdatePromptTemplate {
516    pub name: String,
517    pub template: String,
518    pub description: Option<String>,
519}
520
521#[async_trait]
522pub trait PromptTemplateStore: Send + Sync {
523    async fn list(&self) -> anyhow::Result<Vec<PromptTemplateRecord>>;
524    async fn get(&self, id: &str) -> anyhow::Result<Option<PromptTemplateRecord>>;
525    async fn create(&self, template: NewPromptTemplate) -> anyhow::Result<PromptTemplateRecord>;
526    async fn update(
527        &self,
528        id: &str,
529        update: UpdatePromptTemplate,
530    ) -> anyhow::Result<PromptTemplateRecord>;
531    async fn delete(&self, id: &str) -> anyhow::Result<()>;
532    async fn clone_template(&self, id: &str) -> anyhow::Result<PromptTemplateRecord>;
533    async fn sync_system_templates(&self, templates: Vec<NewPromptTemplate>) -> anyhow::Result<()>;
534}
535
536// ========== Secret Store ==========
537
538#[derive(Debug, Clone, Serialize, Deserialize)]
539pub struct SecretRecord {
540    pub id: String,
541    pub key: String,
542    pub value: String,
543    pub created_at: chrono::DateTime<chrono::Utc>,
544    pub updated_at: chrono::DateTime<chrono::Utc>,
545}
546
547#[derive(Debug, Clone, Serialize, Deserialize)]
548pub struct NewSecret {
549    pub key: String,
550    pub value: String,
551}
552
553#[async_trait]
554pub trait SecretStore: Send + Sync {
555    async fn list(&self) -> anyhow::Result<Vec<SecretRecord>>;
556    async fn get(&self, key: &str) -> anyhow::Result<Option<SecretRecord>>;
557    async fn create(&self, secret: NewSecret) -> anyhow::Result<SecretRecord>;
558    async fn update(&self, key: &str, value: &str) -> anyhow::Result<SecretRecord>;
559    async fn delete(&self, key: &str) -> anyhow::Result<()>;
560}