1use crate::{
2 ScratchpadEntry, ToolAuthStore, ToolResponse, configuration::PluginArtifact,
3 workflow::WorkflowStore,
4};
5use async_trait::async_trait;
6use chrono::{DateTime, Utc};
7use serde::{Deserialize, Serialize, de::DeserializeOwned};
8use serde_json::Value;
9use std::{collections::HashMap, sync::Arc};
10use tokio::sync::oneshot;
11use uuid::Uuid;
12
13use crate::{
14 AgentEvent, CreateThreadRequest, Message, Task, TaskMessage, TaskStatus, Thread,
15 UpdateThreadRequest,
16};
17
18#[derive(Debug, Clone, Default, Serialize, Deserialize)]
22pub struct ThreadListFilter {
23 pub agent_id: Option<String>,
25 pub external_id: Option<String>,
27 #[serde(skip_serializing_if = "Option::is_none")]
29 pub attributes: Option<serde_json::Value>,
30 pub search: Option<String>,
32 pub from_date: Option<DateTime<Utc>>,
34 pub to_date: Option<DateTime<Utc>>,
36 pub tags: Option<Vec<String>>,
38}
39
40#[derive(Debug, Clone, Serialize, Deserialize)]
42pub struct ThreadListResponse {
43 pub threads: Vec<crate::ThreadSummary>,
44 pub total: i64,
45 pub page: u32,
46 pub page_size: u32,
47}
48
49#[derive(Debug, Clone, Serialize, Deserialize)]
51pub struct AgentUsageInfo {
52 pub agent_id: String,
53 pub agent_name: String,
54 pub thread_count: i64,
55}
56
57#[derive(Clone)]
59pub struct InitializedStores {
60 pub session_store: Arc<dyn SessionStore>,
61 pub agent_store: Arc<dyn AgentStore>,
62 pub task_store: Arc<dyn TaskStore>,
63 pub thread_store: Arc<dyn ThreadStore>,
64 pub tool_auth_store: Arc<dyn ToolAuthStore>,
65 pub scratchpad_store: Arc<dyn ScratchpadStore>,
66 pub workflow_store: Arc<dyn WorkflowStore>,
67 pub memory_store: Option<Arc<dyn MemoryStore>>,
68 pub crawl_store: Option<Arc<dyn CrawlStore>>,
69 pub external_tool_calls_store: Arc<dyn ExternalToolCallsStore>,
70 pub plugin_store: Arc<dyn PluginCatalogStore>,
71 pub prompt_template_store: Option<Arc<dyn PromptTemplateStore>>,
72 pub secret_store: Option<Arc<dyn SecretStore>>,
73 pub plugin_tool_loader: Option<Arc<dyn PluginToolLoader>>,
77}
78impl InitializedStores {
79 pub fn set_tool_auth_store(&mut self, tool_auth_store: Arc<dyn ToolAuthStore>) {
80 self.tool_auth_store = tool_auth_store;
81 }
82
83 pub fn set_external_tool_calls_store(mut self, store: Arc<dyn ExternalToolCallsStore>) {
84 self.external_tool_calls_store = store;
85 }
86
87 pub fn set_session_store(&mut self, session_store: Arc<dyn SessionStore>) {
88 self.session_store = session_store;
89 }
90
91 pub fn set_agent_store(&mut self, agent_store: Arc<dyn AgentStore>) {
92 self.agent_store = agent_store;
93 }
94
95 pub fn with_task_store(&mut self, task_store: Arc<dyn TaskStore>) {
96 self.task_store = task_store;
97 }
98
99 pub fn with_thread_store(&mut self, thread_store: Arc<dyn ThreadStore>) {
100 self.thread_store = thread_store;
101 }
102
103 pub fn with_scratchpad_store(&mut self, scratchpad_store: Arc<dyn ScratchpadStore>) {
104 self.scratchpad_store = scratchpad_store;
105 }
106
107 pub fn with_workflow_store(mut self, workflow_store: Arc<dyn WorkflowStore>) {
108 self.workflow_store = workflow_store;
109 }
110
111 pub fn with_plugin_store(&mut self, plugin_store: Arc<dyn PluginCatalogStore>) {
112 self.plugin_store = plugin_store;
113 }
114}
115
116impl std::fmt::Debug for InitializedStores {
117 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
118 f.debug_struct("InitializedStores").finish()
119 }
120}
121
122#[derive(Debug, Serialize, Deserialize, Clone)]
123pub struct SessionSummary {
124 pub session_id: String,
125 pub keys: Vec<String>,
126 pub key_count: usize,
127 pub updated_at: Option<DateTime<Utc>>,
128}
129
130#[async_trait::async_trait]
132pub trait SessionStore: Send + Sync + std::fmt::Debug {
133 async fn clear_session(&self, namespace: &str) -> anyhow::Result<()>;
134
135 async fn set_value(&self, namespace: &str, key: &str, value: &Value) -> anyhow::Result<()>;
136
137 async fn set_value_with_expiry(
138 &self,
139 namespace: &str,
140 key: &str,
141 value: &Value,
142 expiry: Option<chrono::DateTime<chrono::Utc>>,
143 ) -> anyhow::Result<()>;
144
145 async fn get_value(&self, namespace: &str, key: &str) -> anyhow::Result<Option<Value>>;
146
147 async fn delete_value(&self, namespace: &str, key: &str) -> anyhow::Result<()>;
148
149 async fn get_all_values(&self, namespace: &str) -> anyhow::Result<HashMap<String, Value>>;
150
151 async fn list_sessions(
152 &self,
153 namespace: Option<&str>,
154 limit: Option<usize>,
155 offset: Option<usize>,
156 ) -> anyhow::Result<Vec<SessionSummary>>;
157}
158#[async_trait::async_trait]
159pub trait SessionStoreExt: SessionStore {
160 async fn set<T: Serialize + Sync>(
161 &self,
162 namespace: &str,
163 key: &str,
164 value: &T,
165 ) -> anyhow::Result<()> {
166 self.set_value(namespace, key, &serde_json::to_value(value)?)
167 .await
168 }
169 async fn set_with_expiry<T: Serialize + Sync>(
170 &self,
171 namespace: &str,
172 key: &str,
173 value: &T,
174 expiry: Option<chrono::DateTime<chrono::Utc>>,
175 ) -> anyhow::Result<()> {
176 self.set_value_with_expiry(namespace, key, &serde_json::to_value(value)?, expiry)
177 .await
178 }
179 async fn get<T: DeserializeOwned + Sync>(
180 &self,
181 namespace: &str,
182 key: &str,
183 ) -> anyhow::Result<Option<T>> {
184 match self.get_value(namespace, key).await? {
185 Some(b) => Ok(Some(serde_json::from_value(b)?)),
186 None => Ok(None),
187 }
188 }
189}
190impl<T: SessionStore + ?Sized> SessionStoreExt for T {}
191
192#[async_trait::async_trait]
194pub trait MemoryStore: Send + Sync {
195 async fn store_memory(
197 &self,
198 user_id: &str,
199 session_memory: SessionMemory,
200 ) -> anyhow::Result<()>;
201
202 async fn search_memories(
204 &self,
205 user_id: &str,
206 query: &str,
207 limit: Option<usize>,
208 ) -> anyhow::Result<Vec<String>>;
209
210 async fn get_user_memories(&self, user_id: &str) -> anyhow::Result<Vec<String>>;
212
213 async fn clear_user_memories(&self, user_id: &str) -> anyhow::Result<()>;
215}
216
217#[derive(Debug, Clone)]
218pub struct SessionMemory {
219 pub agent_id: String,
220 pub thread_id: String,
221 pub session_summary: String,
222 pub key_insights: Vec<String>,
223 pub important_facts: Vec<String>,
224 pub timestamp: chrono::DateTime<chrono::Utc>,
225}
226#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
227#[serde(tag = "type", rename_all = "snake_case")]
228pub enum FilterMessageType {
229 Events,
230 Messages,
231 Artifacts,
232}
233
234#[derive(Debug, Clone, Serialize, Deserialize)]
235pub struct MessageFilter {
236 pub filter: Option<Vec<FilterMessageType>>,
237 pub limit: Option<usize>,
238 pub offset: Option<usize>,
239}
240
241#[async_trait]
243pub trait TaskStore: Send + Sync {
244 fn init_task(
245 &self,
246 context_id: &str,
247 task_id: Option<&str>,
248 status: Option<TaskStatus>,
249 ) -> Task {
250 let task_id = task_id.unwrap_or(&Uuid::new_v4().to_string()).to_string();
251 Task {
252 id: task_id,
253 status: status.unwrap_or(TaskStatus::Pending),
254 created_at: chrono::Utc::now().timestamp_millis(),
255 updated_at: chrono::Utc::now().timestamp_millis(),
256 thread_id: context_id.to_string(),
257 parent_task_id: None,
258 }
259 }
260 async fn get_or_create_task(
261 &self,
262 thread_id: &str,
263 task_id: &str,
264 ) -> Result<(), anyhow::Error> {
265 match self.get_task(&task_id).await? {
266 Some(task) => task,
267 None => {
268 self.create_task(&thread_id, Some(&task_id), Some(TaskStatus::Running))
269 .await?
270 }
271 };
272
273 Ok(())
274 }
275 async fn create_task(
276 &self,
277 context_id: &str,
278 task_id: Option<&str>,
279 task_status: Option<TaskStatus>,
280 ) -> anyhow::Result<Task>;
281 async fn get_task(&self, task_id: &str) -> anyhow::Result<Option<Task>>;
282 async fn update_task_status(&self, task_id: &str, status: TaskStatus) -> anyhow::Result<()>;
283 async fn add_event_to_task(&self, task_id: &str, event: AgentEvent) -> anyhow::Result<()>;
284 async fn add_message_to_task(&self, task_id: &str, message: &Message) -> anyhow::Result<()>;
285 async fn cancel_task(&self, task_id: &str) -> anyhow::Result<Task>;
286 async fn list_tasks(&self, thread_id: Option<&str>) -> anyhow::Result<Vec<Task>>;
287
288 async fn get_history(
289 &self,
290 thread_id: &str,
291 filter: Option<MessageFilter>,
292 ) -> anyhow::Result<Vec<(Task, Vec<TaskMessage>)>>;
293
294 async fn update_parent_task(
295 &self,
296 task_id: &str,
297 parent_task_id: Option<&str>,
298 ) -> anyhow::Result<()>;
299}
300
301#[derive(Debug, Clone)]
302pub struct PluginMetadataRecord {
303 pub package_name: String,
304 pub version: Option<String>,
305 pub object_prefix: String,
306 pub entrypoint: Option<String>,
307 pub artifact: PluginArtifact,
308 pub updated_at: chrono::DateTime<chrono::Utc>,
309}
310
311#[async_trait]
312pub trait PluginCatalogStore: Send + Sync {
313 async fn list_plugins(&self) -> anyhow::Result<Vec<PluginMetadataRecord>>;
314
315 async fn get_plugin(&self, package_name: &str) -> anyhow::Result<Option<PluginMetadataRecord>>;
316
317 async fn upsert_plugin(&self, record: &PluginMetadataRecord) -> anyhow::Result<()>;
318
319 async fn remove_plugin(&self, package_name: &str) -> anyhow::Result<()>;
320
321 async fn clear(&self) -> anyhow::Result<()>;
322}
323
324#[async_trait]
326pub trait ThreadStore: Send + Sync {
327 fn as_any(&self) -> &dyn std::any::Any;
328 async fn create_thread(&self, request: CreateThreadRequest) -> anyhow::Result<Thread>;
329 async fn get_thread(&self, thread_id: &str) -> anyhow::Result<Option<Thread>>;
330 async fn update_thread(
331 &self,
332 thread_id: &str,
333 request: UpdateThreadRequest,
334 ) -> anyhow::Result<Thread>;
335 async fn delete_thread(&self, thread_id: &str) -> anyhow::Result<()>;
336
337 async fn list_threads(
340 &self,
341 filter: &ThreadListFilter,
342 limit: Option<u32>,
343 offset: Option<u32>,
344 ) -> anyhow::Result<ThreadListResponse>;
345
346 async fn update_thread_with_message(
347 &self,
348 thread_id: &str,
349 message: &str,
350 ) -> anyhow::Result<()>;
351
352 async fn get_home_stats(&self) -> anyhow::Result<HomeStats>;
354
355 async fn get_agents_by_usage(&self, search: Option<&str>) -> anyhow::Result<Vec<AgentUsageInfo>>;
359
360 async fn get_agent_stats_map(
362 &self,
363 ) -> anyhow::Result<std::collections::HashMap<String, AgentStatsInfo>>;
364
365 async fn mark_message_read(
369 &self,
370 thread_id: &str,
371 message_id: &str,
372 ) -> anyhow::Result<MessageReadStatus>;
373
374 async fn get_message_read_status(
376 &self,
377 thread_id: &str,
378 message_id: &str,
379 ) -> anyhow::Result<Option<MessageReadStatus>>;
380
381 async fn get_thread_read_status(
383 &self,
384 thread_id: &str,
385 ) -> anyhow::Result<Vec<MessageReadStatus>>;
386
387 async fn vote_message(&self, request: VoteMessageRequest) -> anyhow::Result<MessageVote>;
392
393 async fn remove_vote(&self, thread_id: &str, message_id: &str) -> anyhow::Result<()>;
395
396 async fn get_user_vote(
398 &self,
399 thread_id: &str,
400 message_id: &str,
401 ) -> anyhow::Result<Option<MessageVote>>;
402
403 async fn get_message_vote_summary(
405 &self,
406 thread_id: &str,
407 message_id: &str,
408 ) -> anyhow::Result<MessageVoteSummary>;
409
410 async fn get_message_votes(
412 &self,
413 thread_id: &str,
414 message_id: &str,
415 ) -> anyhow::Result<Vec<MessageVote>>;
416}
417
418#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
420pub struct HomeStats {
421 pub total_agents: i64,
422 pub total_threads: i64,
423 pub total_messages: i64,
424 pub avg_run_time_ms: Option<f64>,
425 #[serde(skip_serializing_if = "Option::is_none")]
427 pub total_owned_agents: Option<i64>,
428 #[serde(skip_serializing_if = "Option::is_none")]
429 pub total_accessible_agents: Option<i64>,
430 #[serde(skip_serializing_if = "Option::is_none")]
431 pub most_active_agent: Option<MostActiveAgent>,
432 #[serde(skip_serializing_if = "Option::is_none")]
433 pub latest_threads: Option<Vec<LatestThreadInfo>>,
434 #[serde(skip_serializing_if = "Option::is_none")]
436 pub recently_used_agents: Option<Vec<RecentlyUsedAgent>>,
437 #[serde(skip_serializing_if = "Option::is_none")]
440 pub custom_metrics: Option<std::collections::HashMap<String, CustomMetric>>,
441}
442
443#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
445pub struct CustomMetric {
446 pub label: String,
448 pub value: String,
450 #[serde(skip_serializing_if = "Option::is_none")]
452 pub helper: Option<String>,
453 #[serde(skip_serializing_if = "Option::is_none")]
455 pub limit: Option<String>,
456 #[serde(skip_serializing_if = "Option::is_none")]
458 pub raw_value: Option<i64>,
459 #[serde(skip_serializing_if = "Option::is_none")]
461 pub raw_limit: Option<i64>,
462}
463
464#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
465pub struct MostActiveAgent {
466 pub id: String,
467 pub name: String,
468 pub thread_count: i64,
469}
470
471#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
473pub struct RecentlyUsedAgent {
474 pub id: String,
475 pub name: String,
476 pub description: Option<String>,
477 pub last_used_at: chrono::DateTime<chrono::Utc>,
478}
479
480#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
481pub struct LatestThreadInfo {
482 pub id: String,
483 pub title: String,
484 pub agent_id: String,
485 pub agent_name: String,
486 pub updated_at: chrono::DateTime<chrono::Utc>,
487}
488
489#[derive(Debug, Clone, Default, serde::Serialize, serde::Deserialize)]
491pub struct AgentStatsInfo {
492 pub thread_count: i64,
493 pub sub_agent_usage_count: i64,
494 pub last_used_at: Option<chrono::DateTime<chrono::Utc>>,
495}
496
497#[async_trait]
498pub trait AgentStore: Send + Sync {
499 async fn list(
500 &self,
501 cursor: Option<String>,
502 limit: Option<usize>,
503 ) -> (Vec<crate::configuration::AgentConfig>, Option<String>);
504
505 async fn get(&self, name: &str) -> Option<crate::configuration::AgentConfig>;
506 async fn register(&self, config: crate::configuration::AgentConfig) -> anyhow::Result<()>;
507 async fn update(&self, config: crate::configuration::AgentConfig) -> anyhow::Result<()>;
509
510 async fn clear(&self) -> anyhow::Result<()>;
511}
512
513#[async_trait::async_trait]
515pub trait ScratchpadStore: Send + Sync + std::fmt::Debug {
516 async fn add_entry(
518 &self,
519 thread_id: &str,
520 entry: ScratchpadEntry,
521 ) -> Result<(), crate::AgentError>;
522
523 async fn clear_entries(&self, thread_id: &str) -> Result<(), crate::AgentError>;
525
526 async fn get_entries(
528 &self,
529 thread_id: &str,
530 task_id: &str,
531 limit: Option<usize>,
532 ) -> Result<Vec<ScratchpadEntry>, crate::AgentError>;
533
534 async fn get_all_entries(
535 &self,
536 thread_id: &str,
537 limit: Option<usize>,
538 ) -> Result<Vec<ScratchpadEntry>, crate::AgentError>;
539}
540
541#[derive(Debug, Clone, Serialize, Deserialize)]
543pub struct CrawlResult {
544 pub id: String,
545 pub url: String,
546 pub title: Option<String>,
547 pub content: String,
548 pub html: Option<String>,
549 pub metadata: serde_json::Value,
550 pub links: Vec<String>,
551 pub images: Vec<String>,
552 pub status_code: Option<u16>,
553 pub crawled_at: chrono::DateTime<chrono::Utc>,
554 pub processing_time_ms: Option<u64>,
555}
556
557#[async_trait]
559pub trait CrawlStore: Send + Sync {
560 async fn store_crawl_result(&self, result: CrawlResult) -> anyhow::Result<String>;
562
563 async fn get_crawl_result(&self, id: &str) -> anyhow::Result<Option<CrawlResult>>;
565
566 async fn get_crawl_results_by_url(&self, url: &str) -> anyhow::Result<Vec<CrawlResult>>;
568
569 async fn get_recent_crawl_results(
571 &self,
572 limit: Option<usize>,
573 since: Option<chrono::DateTime<chrono::Utc>>,
574 ) -> anyhow::Result<Vec<CrawlResult>>;
575
576 async fn is_url_recently_crawled(
578 &self,
579 url: &str,
580 cache_duration: chrono::Duration,
581 ) -> anyhow::Result<Option<CrawlResult>>;
582
583 async fn delete_crawl_result(&self, id: &str) -> anyhow::Result<()>;
585
586 async fn cleanup_old_results(
588 &self,
589 before: chrono::DateTime<chrono::Utc>,
590 ) -> anyhow::Result<usize>;
591}
592
593#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
597#[serde(rename_all = "lowercase")]
598pub enum VoteType {
599 Upvote,
600 Downvote,
601}
602
603#[derive(Debug, Clone, Serialize, Deserialize)]
605pub struct MessageReadStatus {
606 pub thread_id: String,
607 pub message_id: String,
608 pub user_id: String,
609 pub read_at: chrono::DateTime<chrono::Utc>,
610}
611
612#[derive(Debug, Clone, Serialize, Deserialize)]
614pub struct MarkMessageReadRequest {
615 pub thread_id: String,
616 pub message_id: String,
617}
618
619#[derive(Debug, Clone, Serialize, Deserialize)]
621pub struct MessageVote {
622 pub id: String,
623 pub thread_id: String,
624 pub message_id: String,
625 pub user_id: String,
626 pub vote_type: VoteType,
627 pub comment: Option<String>,
629 pub created_at: chrono::DateTime<chrono::Utc>,
630 pub updated_at: chrono::DateTime<chrono::Utc>,
631}
632
633#[derive(Debug, Clone, Serialize, Deserialize)]
635pub struct VoteMessageRequest {
636 pub thread_id: String,
637 pub message_id: String,
638 pub vote_type: VoteType,
639 pub comment: Option<String>,
641}
642
643#[derive(Debug, Clone, Serialize, Deserialize, Default)]
645pub struct MessageVoteSummary {
646 pub message_id: String,
647 pub upvotes: i64,
648 pub downvotes: i64,
649 pub user_vote: Option<VoteType>,
651}
652
653#[async_trait]
655pub trait ExternalToolCallsStore: Send + Sync + std::fmt::Debug {
656 async fn register_external_tool_call(
658 &self,
659 session_id: &str,
660 ) -> anyhow::Result<oneshot::Receiver<ToolResponse>>;
661
662 async fn complete_external_tool_call(
664 &self,
665 session_id: &str,
666 tool_response: ToolResponse,
667 ) -> anyhow::Result<()>;
668
669 async fn remove_tool_call(&self, session_id: &str) -> anyhow::Result<()>;
671
672 async fn list_pending_tool_calls(&self) -> anyhow::Result<Vec<String>>;
674}
675
676#[derive(Debug, Clone, Serialize, Deserialize)]
679pub struct PromptTemplateRecord {
680 pub id: String,
681 pub name: String,
682 pub template: String,
683 pub description: Option<String>,
684 pub version: Option<String>,
685 pub is_system: bool,
686 pub created_at: chrono::DateTime<chrono::Utc>,
687 pub updated_at: chrono::DateTime<chrono::Utc>,
688}
689
690#[derive(Debug, Clone, Serialize, Deserialize)]
691pub struct NewPromptTemplate {
692 pub name: String,
693 pub template: String,
694 pub description: Option<String>,
695 pub version: Option<String>,
696 #[serde(default)]
697 pub is_system: bool,
698}
699
700#[derive(Debug, Clone, Serialize, Deserialize)]
701pub struct UpdatePromptTemplate {
702 pub name: String,
703 pub template: String,
704 pub description: Option<String>,
705}
706
707#[async_trait]
708pub trait PromptTemplateStore: Send + Sync {
709 async fn list(&self) -> anyhow::Result<Vec<PromptTemplateRecord>>;
710 async fn get(&self, id: &str) -> anyhow::Result<Option<PromptTemplateRecord>>;
711 async fn create(&self, template: NewPromptTemplate) -> anyhow::Result<PromptTemplateRecord>;
712 async fn update(
713 &self,
714 id: &str,
715 update: UpdatePromptTemplate,
716 ) -> anyhow::Result<PromptTemplateRecord>;
717 async fn delete(&self, id: &str) -> anyhow::Result<()>;
718 async fn clone_template(&self, id: &str) -> anyhow::Result<PromptTemplateRecord>;
719 async fn sync_system_templates(&self, templates: Vec<NewPromptTemplate>) -> anyhow::Result<()>;
720}
721
722#[async_trait]
731pub trait PluginToolLoader: Send + Sync + std::fmt::Debug {
732 async fn list_packages(&self) -> anyhow::Result<Vec<String>>;
734
735 async fn get_package_tools(&self, package_name: &str) -> anyhow::Result<Vec<Arc<dyn crate::Tool>>>;
737
738 async fn has_package(&self, package_name: &str) -> anyhow::Result<bool>;
740}
741
742#[derive(Debug, Clone, Serialize, Deserialize)]
745pub struct SecretRecord {
746 pub id: String,
747 pub key: String,
748 pub value: String,
749 pub created_at: chrono::DateTime<chrono::Utc>,
750 pub updated_at: chrono::DateTime<chrono::Utc>,
751}
752
753#[derive(Debug, Clone, Serialize, Deserialize)]
754pub struct NewSecret {
755 pub key: String,
756 pub value: String,
757}
758
759#[async_trait]
760pub trait SecretStore: Send + Sync {
761 async fn list(&self) -> anyhow::Result<Vec<SecretRecord>>;
762 async fn get(&self, key: &str) -> anyhow::Result<Option<SecretRecord>>;
763 async fn create(&self, secret: NewSecret) -> anyhow::Result<SecretRecord>;
764 async fn update(&self, key: &str, value: &str) -> anyhow::Result<SecretRecord>;
765 async fn delete(&self, key: &str) -> anyhow::Result<()>;
766}