1use crate::{
2 ScratchpadEntry, ToolAuthStore, ToolResponse, configuration::PluginArtifact,
3 workflow::WorkflowStore,
4};
5use async_trait::async_trait;
6use chrono::{DateTime, Utc};
7use serde::{Deserialize, Serialize, de::DeserializeOwned};
8use serde_json::Value;
9use std::{collections::HashMap, sync::Arc};
10use tokio::sync::oneshot;
11use uuid::Uuid;
12
13use crate::{
14 AgentEvent, CreateThreadRequest, Message, Task, TaskMessage, TaskStatus, Thread, ThreadSummary,
15 UpdateThreadRequest, browser::BrowserSessionRecord,
16};
17
18#[derive(Debug, Clone, Default, Serialize, Deserialize)]
22pub struct ThreadListFilter {
23 pub agent_id: Option<String>,
25 pub external_id: Option<String>,
27 #[serde(skip_serializing_if = "Option::is_none")]
29 pub attributes: Option<serde_json::Value>,
30}
31
32#[derive(Clone)]
34pub struct InitializedStores {
35 pub session_store: Arc<dyn SessionStore>,
36 pub agent_store: Arc<dyn AgentStore>,
37 pub task_store: Arc<dyn TaskStore>,
38 pub thread_store: Arc<dyn ThreadStore>,
39 pub tool_auth_store: Arc<dyn ToolAuthStore>,
40 pub scratchpad_store: Arc<dyn ScratchpadStore>,
41 pub workflow_store: Arc<dyn WorkflowStore>,
42 pub memory_store: Option<Arc<dyn MemoryStore>>,
43 pub crawl_store: Option<Arc<dyn CrawlStore>>,
44 pub external_tool_calls_store: Arc<dyn ExternalToolCallsStore>,
45 pub plugin_store: Arc<dyn PluginCatalogStore>,
46 pub browser_session_store: Arc<dyn BrowserSessionStore>,
47 pub prompt_template_store: Option<Arc<dyn PromptTemplateStore>>,
48 pub secret_store: Option<Arc<dyn SecretStore>>,
49}
50impl InitializedStores {
51 pub fn set_tool_auth_store(&mut self, tool_auth_store: Arc<dyn ToolAuthStore>) {
52 self.tool_auth_store = tool_auth_store;
53 }
54
55 pub fn set_external_tool_calls_store(mut self, store: Arc<dyn ExternalToolCallsStore>) {
56 self.external_tool_calls_store = store;
57 }
58
59 pub fn set_session_store(&mut self, session_store: Arc<dyn SessionStore>) {
60 self.session_store = session_store;
61 }
62
63 pub fn set_agent_store(&mut self, agent_store: Arc<dyn AgentStore>) {
64 self.agent_store = agent_store;
65 }
66
67 pub fn with_task_store(&mut self, task_store: Arc<dyn TaskStore>) {
68 self.task_store = task_store;
69 }
70
71 pub fn with_thread_store(&mut self, thread_store: Arc<dyn ThreadStore>) {
72 self.thread_store = thread_store;
73 }
74
75 pub fn with_scratchpad_store(&mut self, scratchpad_store: Arc<dyn ScratchpadStore>) {
76 self.scratchpad_store = scratchpad_store;
77 }
78
79 pub fn with_workflow_store(mut self, workflow_store: Arc<dyn WorkflowStore>) {
80 self.workflow_store = workflow_store;
81 }
82
83 pub fn with_plugin_store(&mut self, plugin_store: Arc<dyn PluginCatalogStore>) {
84 self.plugin_store = plugin_store;
85 }
86
87 pub fn set_browser_session_store(&mut self, store: Arc<dyn BrowserSessionStore>) {
88 self.browser_session_store = store;
89 }
90}
91
92impl std::fmt::Debug for InitializedStores {
93 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
94 f.debug_struct("InitializedStores").finish()
95 }
96}
97
98#[derive(Debug, Serialize, Deserialize, Clone)]
99pub struct SessionSummary {
100 pub session_id: String,
101 pub keys: Vec<String>,
102 pub key_count: usize,
103 pub updated_at: Option<DateTime<Utc>>,
104}
105
106#[async_trait::async_trait]
108pub trait SessionStore: Send + Sync + std::fmt::Debug {
109 async fn clear_session(&self, namespace: &str) -> anyhow::Result<()>;
110
111 async fn set_value(&self, namespace: &str, key: &str, value: &Value) -> anyhow::Result<()>;
112
113 async fn set_value_with_expiry(
114 &self,
115 namespace: &str,
116 key: &str,
117 value: &Value,
118 expiry: Option<chrono::DateTime<chrono::Utc>>,
119 ) -> anyhow::Result<()>;
120
121 async fn get_value(&self, namespace: &str, key: &str) -> anyhow::Result<Option<Value>>;
122
123 async fn delete_value(&self, namespace: &str, key: &str) -> anyhow::Result<()>;
124
125 async fn get_all_values(&self, namespace: &str) -> anyhow::Result<HashMap<String, Value>>;
126
127 async fn list_sessions(
128 &self,
129 namespace: Option<&str>,
130 limit: Option<usize>,
131 offset: Option<usize>,
132 ) -> anyhow::Result<Vec<SessionSummary>>;
133}
134#[async_trait::async_trait]
135pub trait SessionStoreExt: SessionStore {
136 async fn set<T: Serialize + Sync>(
137 &self,
138 namespace: &str,
139 key: &str,
140 value: &T,
141 ) -> anyhow::Result<()> {
142 self.set_value(namespace, key, &serde_json::to_value(value)?)
143 .await
144 }
145 async fn set_with_expiry<T: Serialize + Sync>(
146 &self,
147 namespace: &str,
148 key: &str,
149 value: &T,
150 expiry: Option<chrono::DateTime<chrono::Utc>>,
151 ) -> anyhow::Result<()> {
152 self.set_value_with_expiry(namespace, key, &serde_json::to_value(value)?, expiry)
153 .await
154 }
155 async fn get<T: DeserializeOwned + Sync>(
156 &self,
157 namespace: &str,
158 key: &str,
159 ) -> anyhow::Result<Option<T>> {
160 match self.get_value(namespace, key).await? {
161 Some(b) => Ok(Some(serde_json::from_value(b)?)),
162 None => Ok(None),
163 }
164 }
165}
166impl<T: SessionStore + ?Sized> SessionStoreExt for T {}
167
168#[async_trait::async_trait]
169pub trait BrowserSessionStore: Send + Sync + std::fmt::Debug {
170 async fn save_session(&self, record: BrowserSessionRecord) -> anyhow::Result<()>;
171
172 async fn get_session(&self, user_id: &str) -> anyhow::Result<Option<BrowserSessionRecord>>;
173
174 async fn delete_session(&self, user_id: &str) -> anyhow::Result<()>;
175}
176
177#[async_trait::async_trait]
179pub trait MemoryStore: Send + Sync {
180 async fn store_memory(
182 &self,
183 user_id: &str,
184 session_memory: SessionMemory,
185 ) -> anyhow::Result<()>;
186
187 async fn search_memories(
189 &self,
190 user_id: &str,
191 query: &str,
192 limit: Option<usize>,
193 ) -> anyhow::Result<Vec<String>>;
194
195 async fn get_user_memories(&self, user_id: &str) -> anyhow::Result<Vec<String>>;
197
198 async fn clear_user_memories(&self, user_id: &str) -> anyhow::Result<()>;
200}
201
202#[derive(Debug, Clone)]
203pub struct SessionMemory {
204 pub agent_id: String,
205 pub thread_id: String,
206 pub session_summary: String,
207 pub key_insights: Vec<String>,
208 pub important_facts: Vec<String>,
209 pub timestamp: chrono::DateTime<chrono::Utc>,
210}
211#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
212#[serde(tag = "type", rename_all = "snake_case")]
213pub enum FilterMessageType {
214 Events,
215 Messages,
216 Artifacts,
217}
218
219#[derive(Debug, Clone, Serialize, Deserialize)]
220pub struct MessageFilter {
221 pub filter: Option<Vec<FilterMessageType>>,
222 pub limit: Option<usize>,
223 pub offset: Option<usize>,
224}
225
226#[async_trait]
228pub trait TaskStore: Send + Sync {
229 fn init_task(
230 &self,
231 context_id: &str,
232 task_id: Option<&str>,
233 status: Option<TaskStatus>,
234 ) -> Task {
235 let task_id = task_id.unwrap_or(&Uuid::new_v4().to_string()).to_string();
236 Task {
237 id: task_id,
238 status: status.unwrap_or(TaskStatus::Pending),
239 created_at: chrono::Utc::now().timestamp_millis(),
240 updated_at: chrono::Utc::now().timestamp_millis(),
241 thread_id: context_id.to_string(),
242 parent_task_id: None,
243 }
244 }
245 async fn get_or_create_task(
246 &self,
247 thread_id: &str,
248 task_id: &str,
249 ) -> Result<(), anyhow::Error> {
250 match self.get_task(&task_id).await? {
251 Some(task) => task,
252 None => {
253 self.create_task(&thread_id, Some(&task_id), Some(TaskStatus::Running))
254 .await?
255 }
256 };
257
258 Ok(())
259 }
260 async fn create_task(
261 &self,
262 context_id: &str,
263 task_id: Option<&str>,
264 task_status: Option<TaskStatus>,
265 ) -> anyhow::Result<Task>;
266 async fn get_task(&self, task_id: &str) -> anyhow::Result<Option<Task>>;
267 async fn update_task_status(&self, task_id: &str, status: TaskStatus) -> anyhow::Result<()>;
268 async fn add_event_to_task(&self, task_id: &str, event: AgentEvent) -> anyhow::Result<()>;
269 async fn add_message_to_task(&self, task_id: &str, message: &Message) -> anyhow::Result<()>;
270 async fn cancel_task(&self, task_id: &str) -> anyhow::Result<Task>;
271 async fn list_tasks(&self, thread_id: Option<&str>) -> anyhow::Result<Vec<Task>>;
272
273 async fn get_history(
274 &self,
275 thread_id: &str,
276 filter: Option<MessageFilter>,
277 ) -> anyhow::Result<Vec<(Task, Vec<TaskMessage>)>>;
278
279 async fn update_parent_task(
280 &self,
281 task_id: &str,
282 parent_task_id: Option<&str>,
283 ) -> anyhow::Result<()>;
284}
285
286#[derive(Debug, Clone)]
287pub struct PluginMetadataRecord {
288 pub package_name: String,
289 pub version: Option<String>,
290 pub object_prefix: String,
291 pub entrypoint: Option<String>,
292 pub artifact: PluginArtifact,
293 pub updated_at: chrono::DateTime<chrono::Utc>,
294}
295
296#[async_trait]
297pub trait PluginCatalogStore: Send + Sync {
298 async fn list_plugins(&self) -> anyhow::Result<Vec<PluginMetadataRecord>>;
299
300 async fn get_plugin(&self, package_name: &str) -> anyhow::Result<Option<PluginMetadataRecord>>;
301
302 async fn upsert_plugin(&self, record: &PluginMetadataRecord) -> anyhow::Result<()>;
303
304 async fn remove_plugin(&self, package_name: &str) -> anyhow::Result<()>;
305
306 async fn clear(&self) -> anyhow::Result<()>;
307}
308
309#[async_trait]
311pub trait ThreadStore: Send + Sync {
312 fn as_any(&self) -> &dyn std::any::Any;
313 async fn create_thread(&self, request: CreateThreadRequest) -> anyhow::Result<Thread>;
314 async fn get_thread(&self, thread_id: &str) -> anyhow::Result<Option<Thread>>;
315 async fn update_thread(
316 &self,
317 thread_id: &str,
318 request: UpdateThreadRequest,
319 ) -> anyhow::Result<Thread>;
320 async fn delete_thread(&self, thread_id: &str) -> anyhow::Result<()>;
321
322 async fn list_threads(
323 &self,
324 filter: &ThreadListFilter,
325 limit: Option<u32>,
326 offset: Option<u32>,
327 ) -> anyhow::Result<Vec<ThreadSummary>>;
328 async fn update_thread_with_message(
329 &self,
330 thread_id: &str,
331 message: &str,
332 ) -> anyhow::Result<()>;
333
334 async fn get_home_stats(&self) -> anyhow::Result<HomeStats>;
336}
337
338#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
340pub struct HomeStats {
341 pub total_agents: i64,
342 pub total_threads: i64,
343 pub total_messages: i64,
344 pub avg_run_time_ms: Option<f64>,
345 #[serde(skip_serializing_if = "Option::is_none")]
347 pub total_owned_agents: Option<i64>,
348 #[serde(skip_serializing_if = "Option::is_none")]
349 pub total_accessible_agents: Option<i64>,
350 #[serde(skip_serializing_if = "Option::is_none")]
351 pub most_active_agent: Option<MostActiveAgent>,
352 #[serde(skip_serializing_if = "Option::is_none")]
353 pub latest_threads: Option<Vec<LatestThreadInfo>>,
354}
355
356#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
357pub struct MostActiveAgent {
358 pub id: String,
359 pub name: String,
360 pub thread_count: i64,
361}
362
363#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
364pub struct LatestThreadInfo {
365 pub id: String,
366 pub title: String,
367 pub agent_id: String,
368 pub agent_name: String,
369 pub updated_at: chrono::DateTime<chrono::Utc>,
370}
371
372#[async_trait]
373pub trait AgentStore: Send + Sync {
374 async fn list(
375 &self,
376 cursor: Option<String>,
377 limit: Option<usize>,
378 ) -> (Vec<crate::configuration::AgentConfig>, Option<String>);
379
380 async fn get(&self, name: &str) -> Option<crate::configuration::AgentConfig>;
381 async fn register(&self, config: crate::configuration::AgentConfig) -> anyhow::Result<()>;
382 async fn update(&self, config: crate::configuration::AgentConfig) -> anyhow::Result<()>;
384
385 async fn clear(&self) -> anyhow::Result<()>;
386}
387
388#[async_trait::async_trait]
390pub trait ScratchpadStore: Send + Sync + std::fmt::Debug {
391 async fn add_entry(
393 &self,
394 thread_id: &str,
395 entry: ScratchpadEntry,
396 ) -> Result<(), crate::AgentError>;
397
398 async fn clear_entries(&self, thread_id: &str) -> Result<(), crate::AgentError>;
400
401 async fn get_entries(
403 &self,
404 thread_id: &str,
405 task_id: &str,
406 limit: Option<usize>,
407 ) -> Result<Vec<ScratchpadEntry>, crate::AgentError>;
408
409 async fn get_all_entries(
410 &self,
411 thread_id: &str,
412 limit: Option<usize>,
413 ) -> Result<Vec<ScratchpadEntry>, crate::AgentError>;
414}
415
416#[derive(Debug, Clone, Serialize, Deserialize)]
418pub struct CrawlResult {
419 pub id: String,
420 pub url: String,
421 pub title: Option<String>,
422 pub content: String,
423 pub html: Option<String>,
424 pub metadata: serde_json::Value,
425 pub links: Vec<String>,
426 pub images: Vec<String>,
427 pub status_code: Option<u16>,
428 pub crawled_at: chrono::DateTime<chrono::Utc>,
429 pub processing_time_ms: Option<u64>,
430}
431
432#[async_trait]
434pub trait CrawlStore: Send + Sync {
435 async fn store_crawl_result(&self, result: CrawlResult) -> anyhow::Result<String>;
437
438 async fn get_crawl_result(&self, id: &str) -> anyhow::Result<Option<CrawlResult>>;
440
441 async fn get_crawl_results_by_url(&self, url: &str) -> anyhow::Result<Vec<CrawlResult>>;
443
444 async fn get_recent_crawl_results(
446 &self,
447 limit: Option<usize>,
448 since: Option<chrono::DateTime<chrono::Utc>>,
449 ) -> anyhow::Result<Vec<CrawlResult>>;
450
451 async fn is_url_recently_crawled(
453 &self,
454 url: &str,
455 cache_duration: chrono::Duration,
456 ) -> anyhow::Result<Option<CrawlResult>>;
457
458 async fn delete_crawl_result(&self, id: &str) -> anyhow::Result<()>;
460
461 async fn cleanup_old_results(
463 &self,
464 before: chrono::DateTime<chrono::Utc>,
465 ) -> anyhow::Result<usize>;
466}
467
468#[async_trait]
470pub trait ExternalToolCallsStore: Send + Sync + std::fmt::Debug {
471 async fn register_external_tool_call(
473 &self,
474 session_id: &str,
475 ) -> anyhow::Result<oneshot::Receiver<ToolResponse>>;
476
477 async fn complete_external_tool_call(
479 &self,
480 session_id: &str,
481 tool_response: ToolResponse,
482 ) -> anyhow::Result<()>;
483
484 async fn remove_tool_call(&self, session_id: &str) -> anyhow::Result<()>;
486
487 async fn list_pending_tool_calls(&self) -> anyhow::Result<Vec<String>>;
489}
490
491#[derive(Debug, Clone, Serialize, Deserialize)]
494pub struct PromptTemplateRecord {
495 pub id: String,
496 pub name: String,
497 pub template: String,
498 pub description: Option<String>,
499 pub version: Option<String>,
500 pub is_system: bool,
501 pub created_at: chrono::DateTime<chrono::Utc>,
502 pub updated_at: chrono::DateTime<chrono::Utc>,
503}
504
505#[derive(Debug, Clone, Serialize, Deserialize)]
506pub struct NewPromptTemplate {
507 pub name: String,
508 pub template: String,
509 pub description: Option<String>,
510 pub version: Option<String>,
511 pub is_system: bool,
512}
513
514#[derive(Debug, Clone, Serialize, Deserialize)]
515pub struct UpdatePromptTemplate {
516 pub name: String,
517 pub template: String,
518 pub description: Option<String>,
519}
520
521#[async_trait]
522pub trait PromptTemplateStore: Send + Sync {
523 async fn list(&self) -> anyhow::Result<Vec<PromptTemplateRecord>>;
524 async fn get(&self, id: &str) -> anyhow::Result<Option<PromptTemplateRecord>>;
525 async fn create(&self, template: NewPromptTemplate) -> anyhow::Result<PromptTemplateRecord>;
526 async fn update(
527 &self,
528 id: &str,
529 update: UpdatePromptTemplate,
530 ) -> anyhow::Result<PromptTemplateRecord>;
531 async fn delete(&self, id: &str) -> anyhow::Result<()>;
532 async fn clone_template(&self, id: &str) -> anyhow::Result<PromptTemplateRecord>;
533 async fn sync_system_templates(&self, templates: Vec<NewPromptTemplate>) -> anyhow::Result<()>;
534}
535
536#[derive(Debug, Clone, Serialize, Deserialize)]
539pub struct SecretRecord {
540 pub id: String,
541 pub key: String,
542 pub value: String,
543 pub created_at: chrono::DateTime<chrono::Utc>,
544 pub updated_at: chrono::DateTime<chrono::Utc>,
545}
546
547#[derive(Debug, Clone, Serialize, Deserialize)]
548pub struct NewSecret {
549 pub key: String,
550 pub value: String,
551}
552
553#[async_trait]
554pub trait SecretStore: Send + Sync {
555 async fn list(&self) -> anyhow::Result<Vec<SecretRecord>>;
556 async fn get(&self, key: &str) -> anyhow::Result<Option<SecretRecord>>;
557 async fn create(&self, secret: NewSecret) -> anyhow::Result<SecretRecord>;
558 async fn update(&self, key: &str, value: &str) -> anyhow::Result<SecretRecord>;
559 async fn delete(&self, key: &str) -> anyhow::Result<()>;
560}