1use crate::{
2 ScratchpadEntry, ToolAuthStore, ToolResponse, configuration::PluginArtifact,
3 workflow::WorkflowStore,
4};
5use async_trait::async_trait;
6use serde::{Deserialize, Serialize, de::DeserializeOwned};
7use serde_json::Value;
8use std::{collections::HashMap, sync::Arc};
9use tokio::sync::oneshot;
10use uuid::Uuid;
11
12use crate::{
13 AgentEvent, CreateThreadRequest, Message, Task, TaskMessage, TaskStatus, Thread, ThreadSummary,
14 UpdateThreadRequest, browser::BrowserSessionRecord,
15};
16
17#[derive(Clone)]
21pub struct InitializedStores {
22 pub session_store: Arc<dyn SessionStore>,
23 pub agent_store: Arc<dyn AgentStore>,
24 pub task_store: Arc<dyn TaskStore>,
25 pub thread_store: Arc<dyn ThreadStore>,
26 pub tool_auth_store: Arc<dyn ToolAuthStore>,
27 pub scratchpad_store: Arc<dyn ScratchpadStore>,
28 pub workflow_store: Arc<dyn WorkflowStore>,
29 pub memory_store: Option<Arc<dyn MemoryStore>>,
30 pub crawl_store: Option<Arc<dyn CrawlStore>>,
31 pub external_tool_calls_store: Arc<dyn ExternalToolCallsStore>,
32 pub plugin_store: Arc<dyn PluginCatalogStore>,
33 pub browser_session_store: Arc<dyn BrowserSessionStore>,
34}
35impl InitializedStores {
36 pub fn set_tool_auth_store(&mut self, tool_auth_store: Arc<dyn ToolAuthStore>) {
37 self.tool_auth_store = tool_auth_store;
38 }
39
40 pub fn set_external_tool_calls_store(mut self, store: Arc<dyn ExternalToolCallsStore>) {
41 self.external_tool_calls_store = store;
42 }
43
44 pub fn set_session_store(&mut self, session_store: Arc<dyn SessionStore>) {
45 self.session_store = session_store;
46 }
47
48 pub fn set_agent_store(&mut self, agent_store: Arc<dyn AgentStore>) {
49 self.agent_store = agent_store;
50 }
51
52 pub fn with_task_store(&mut self, task_store: Arc<dyn TaskStore>) {
53 self.task_store = task_store;
54 }
55
56 pub fn with_thread_store(&mut self, thread_store: Arc<dyn ThreadStore>) {
57 self.thread_store = thread_store;
58 }
59
60 pub fn with_scratchpad_store(&mut self, scratchpad_store: Arc<dyn ScratchpadStore>) {
61 self.scratchpad_store = scratchpad_store;
62 }
63
64 pub fn with_workflow_store(mut self, workflow_store: Arc<dyn WorkflowStore>) {
65 self.workflow_store = workflow_store;
66 }
67
68 pub fn with_plugin_store(&mut self, plugin_store: Arc<dyn PluginCatalogStore>) {
69 self.plugin_store = plugin_store;
70 }
71
72 pub fn set_browser_session_store(&mut self, store: Arc<dyn BrowserSessionStore>) {
73 self.browser_session_store = store;
74 }
75}
76
77impl std::fmt::Debug for InitializedStores {
78 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
79 f.debug_struct("InitializedStores").finish()
80 }
81}
82
83#[async_trait::async_trait]
85pub trait SessionStore: Send + Sync + std::fmt::Debug {
86 async fn clear_session(&self, namespace: &str) -> anyhow::Result<()>;
87
88 async fn set_value(&self, namespace: &str, key: &str, value: &Value) -> anyhow::Result<()>;
89
90 async fn set_value_with_expiry(
91 &self,
92 namespace: &str,
93 key: &str,
94 value: &Value,
95 expiry: Option<chrono::DateTime<chrono::Utc>>,
96 ) -> anyhow::Result<()>;
97
98 async fn get_value(&self, namespace: &str, key: &str) -> anyhow::Result<Option<Value>>;
99
100 async fn delete_value(&self, namespace: &str, key: &str) -> anyhow::Result<()>;
101
102 async fn get_all_values(&self, namespace: &str) -> anyhow::Result<HashMap<String, Value>>;
103}
104#[async_trait::async_trait]
105pub trait SessionStoreExt: SessionStore {
106 async fn set<T: Serialize + Sync>(
107 &self,
108 namespace: &str,
109 key: &str,
110 value: &T,
111 ) -> anyhow::Result<()> {
112 self.set_value(namespace, key, &serde_json::to_value(value)?)
113 .await
114 }
115 async fn set_with_expiry<T: Serialize + Sync>(
116 &self,
117 namespace: &str,
118 key: &str,
119 value: &T,
120 expiry: Option<chrono::DateTime<chrono::Utc>>,
121 ) -> anyhow::Result<()> {
122 self.set_value_with_expiry(namespace, key, &serde_json::to_value(value)?, expiry)
123 .await
124 }
125 async fn get<T: DeserializeOwned + Sync>(
126 &self,
127 namespace: &str,
128 key: &str,
129 ) -> anyhow::Result<Option<T>> {
130 match self.get_value(namespace, key).await? {
131 Some(b) => Ok(Some(serde_json::from_value(b)?)),
132 None => Ok(None),
133 }
134 }
135}
136impl<T: SessionStore + ?Sized> SessionStoreExt for T {}
137
138#[async_trait::async_trait]
139pub trait BrowserSessionStore: Send + Sync + std::fmt::Debug {
140 async fn save_session(&self, record: BrowserSessionRecord) -> anyhow::Result<()>;
141
142 async fn get_session(&self, user_id: &str) -> anyhow::Result<Option<BrowserSessionRecord>>;
143
144 async fn delete_session(&self, user_id: &str) -> anyhow::Result<()>;
145}
146
147#[async_trait::async_trait]
149pub trait MemoryStore: Send + Sync {
150 async fn store_memory(
152 &self,
153 user_id: &str,
154 session_memory: SessionMemory,
155 ) -> anyhow::Result<()>;
156
157 async fn search_memories(
159 &self,
160 user_id: &str,
161 query: &str,
162 limit: Option<usize>,
163 ) -> anyhow::Result<Vec<String>>;
164
165 async fn get_user_memories(&self, user_id: &str) -> anyhow::Result<Vec<String>>;
167
168 async fn clear_user_memories(&self, user_id: &str) -> anyhow::Result<()>;
170}
171
172#[derive(Debug, Clone)]
173pub struct SessionMemory {
174 pub agent_id: String,
175 pub thread_id: String,
176 pub session_summary: String,
177 pub key_insights: Vec<String>,
178 pub important_facts: Vec<String>,
179 pub timestamp: chrono::DateTime<chrono::Utc>,
180}
181#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
182#[serde(tag = "type", rename_all = "snake_case")]
183pub enum FilterMessageType {
184 Events,
185 Messages,
186 Artifacts,
187}
188
189#[derive(Debug, Clone, Serialize, Deserialize)]
190pub struct MessageFilter {
191 pub filter: Option<Vec<FilterMessageType>>,
192 pub limit: Option<usize>,
193 pub offset: Option<usize>,
194}
195
196#[async_trait]
198pub trait TaskStore: Send + Sync {
199 fn init_task(
200 &self,
201 context_id: &str,
202 task_id: Option<&str>,
203 status: Option<TaskStatus>,
204 ) -> Task {
205 let task_id = task_id.unwrap_or(&Uuid::new_v4().to_string()).to_string();
206 Task {
207 id: task_id,
208 status: status.unwrap_or(TaskStatus::Pending),
209 created_at: chrono::Utc::now().timestamp_millis(),
210 updated_at: chrono::Utc::now().timestamp_millis(),
211 thread_id: context_id.to_string(),
212 parent_task_id: None,
213 }
214 }
215 async fn get_or_create_task(
216 &self,
217 thread_id: &str,
218 task_id: &str,
219 ) -> Result<(), anyhow::Error> {
220 match self.get_task(&task_id).await? {
221 Some(task) => task,
222 None => {
223 self.create_task(&thread_id, Some(&task_id), Some(TaskStatus::Running))
224 .await?
225 }
226 };
227
228 Ok(())
229 }
230 async fn create_task(
231 &self,
232 context_id: &str,
233 task_id: Option<&str>,
234 task_status: Option<TaskStatus>,
235 ) -> anyhow::Result<Task>;
236 async fn get_task(&self, task_id: &str) -> anyhow::Result<Option<Task>>;
237 async fn update_task_status(&self, task_id: &str, status: TaskStatus) -> anyhow::Result<()>;
238 async fn add_event_to_task(&self, task_id: &str, event: AgentEvent) -> anyhow::Result<()>;
239 async fn add_message_to_task(&self, task_id: &str, message: &Message) -> anyhow::Result<()>;
240 async fn cancel_task(&self, task_id: &str) -> anyhow::Result<Task>;
241 async fn list_tasks(&self, thread_id: Option<&str>) -> anyhow::Result<Vec<Task>>;
242
243 async fn get_history(
244 &self,
245 thread_id: &str,
246 filter: Option<MessageFilter>,
247 ) -> anyhow::Result<Vec<(Task, Vec<TaskMessage>)>>;
248
249 async fn update_parent_task(
250 &self,
251 task_id: &str,
252 parent_task_id: Option<&str>,
253 ) -> anyhow::Result<()>;
254}
255
256#[derive(Debug, Clone)]
257pub struct PluginMetadataRecord {
258 pub package_name: String,
259 pub version: Option<String>,
260 pub object_prefix: String,
261 pub entrypoint: Option<String>,
262 pub artifact: PluginArtifact,
263 pub updated_at: chrono::DateTime<chrono::Utc>,
264}
265
266#[async_trait]
267pub trait PluginCatalogStore: Send + Sync {
268 async fn list_plugins(&self) -> anyhow::Result<Vec<PluginMetadataRecord>>;
269
270 async fn get_plugin(&self, package_name: &str) -> anyhow::Result<Option<PluginMetadataRecord>>;
271
272 async fn upsert_plugin(&self, record: &PluginMetadataRecord) -> anyhow::Result<()>;
273
274 async fn remove_plugin(&self, package_name: &str) -> anyhow::Result<()>;
275
276 async fn clear(&self) -> anyhow::Result<()>;
277}
278
279#[async_trait]
281pub trait ThreadStore: Send + Sync {
282 fn as_any(&self) -> &dyn std::any::Any;
283 async fn create_thread(&self, request: CreateThreadRequest) -> anyhow::Result<Thread>;
284 async fn get_thread(&self, thread_id: &str) -> anyhow::Result<Option<Thread>>;
285 async fn update_thread(
286 &self,
287 thread_id: &str,
288 request: UpdateThreadRequest,
289 ) -> anyhow::Result<Thread>;
290 async fn delete_thread(&self, thread_id: &str) -> anyhow::Result<()>;
291 async fn list_threads(
292 &self,
293 agent_id: Option<&str>,
294 limit: Option<u32>,
295 offset: Option<u32>,
296 attributes_filter: Option<&serde_json::Value>,
297 ) -> anyhow::Result<Vec<ThreadSummary>>;
298 async fn update_thread_with_message(
299 &self,
300 thread_id: &str,
301 message: &str,
302 ) -> anyhow::Result<()>;
303}
304
305#[async_trait]
306pub trait AgentStore: Send + Sync {
307 async fn list(
308 &self,
309 cursor: Option<String>,
310 limit: Option<usize>,
311 ) -> (Vec<crate::configuration::AgentConfig>, Option<String>);
312
313 async fn get(&self, name: &str) -> Option<crate::configuration::AgentConfig>;
314 async fn register(&self, config: crate::configuration::AgentConfig) -> anyhow::Result<()>;
315 async fn update(&self, config: crate::configuration::AgentConfig) -> anyhow::Result<()>;
317
318 async fn clear(&self) -> anyhow::Result<()>;
319}
320
321#[async_trait::async_trait]
323pub trait ScratchpadStore: Send + Sync + std::fmt::Debug {
324 async fn add_entry(
326 &self,
327 thread_id: &str,
328 entry: ScratchpadEntry,
329 ) -> Result<(), crate::AgentError>;
330
331 async fn clear_entries(&self, thread_id: &str) -> Result<(), crate::AgentError>;
333
334 async fn get_entries(
336 &self,
337 thread_id: &str,
338 task_id: &str,
339 limit: Option<usize>,
340 ) -> Result<Vec<ScratchpadEntry>, crate::AgentError>;
341
342 async fn get_all_entries(
343 &self,
344 thread_id: &str,
345 limit: Option<usize>,
346 ) -> Result<Vec<ScratchpadEntry>, crate::AgentError>;
347}
348
349#[derive(Debug, Clone, Serialize, Deserialize)]
351pub struct CrawlResult {
352 pub id: String,
353 pub url: String,
354 pub title: Option<String>,
355 pub content: String,
356 pub html: Option<String>,
357 pub metadata: serde_json::Value,
358 pub links: Vec<String>,
359 pub images: Vec<String>,
360 pub status_code: Option<u16>,
361 pub crawled_at: chrono::DateTime<chrono::Utc>,
362 pub processing_time_ms: Option<u64>,
363}
364
365#[async_trait]
367pub trait CrawlStore: Send + Sync {
368 async fn store_crawl_result(&self, result: CrawlResult) -> anyhow::Result<String>;
370
371 async fn get_crawl_result(&self, id: &str) -> anyhow::Result<Option<CrawlResult>>;
373
374 async fn get_crawl_results_by_url(&self, url: &str) -> anyhow::Result<Vec<CrawlResult>>;
376
377 async fn get_recent_crawl_results(
379 &self,
380 limit: Option<usize>,
381 since: Option<chrono::DateTime<chrono::Utc>>,
382 ) -> anyhow::Result<Vec<CrawlResult>>;
383
384 async fn is_url_recently_crawled(
386 &self,
387 url: &str,
388 cache_duration: chrono::Duration,
389 ) -> anyhow::Result<Option<CrawlResult>>;
390
391 async fn delete_crawl_result(&self, id: &str) -> anyhow::Result<()>;
393
394 async fn cleanup_old_results(
396 &self,
397 before: chrono::DateTime<chrono::Utc>,
398 ) -> anyhow::Result<usize>;
399}
400
401#[async_trait]
403pub trait ExternalToolCallsStore: Send + Sync + std::fmt::Debug {
404 async fn register_external_tool_call(
406 &self,
407 session_id: &str,
408 ) -> anyhow::Result<oneshot::Receiver<ToolResponse>>;
409
410 async fn complete_external_tool_call(
412 &self,
413 session_id: &str,
414 tool_response: ToolResponse,
415 ) -> anyhow::Result<()>;
416
417 async fn remove_tool_call(&self, session_id: &str) -> anyhow::Result<()>;
419
420 async fn list_pending_tool_calls(&self) -> anyhow::Result<Vec<String>>;
422}