Skip to main content

distri_types/
stores.rs

1use crate::connections::{Connection, ConnectionStatus, ConnectionToken, NewConnection};
2use crate::{ScratchpadEntry, ToolAuthStore, ToolResponse};
3use async_trait::async_trait;
4use chrono::{DateTime, Utc};
5use schemars::JsonSchema;
6use serde::{Deserialize, Serialize, de::DeserializeOwned};
7use serde_json::Value;
8use std::{collections::HashMap, sync::Arc};
9use tokio::sync::oneshot;
10use utoipa::ToSchema;
11use uuid::Uuid;
12
13use crate::{
14    AgentEvent, CreateThreadRequest, Message, Task, TaskMessage, TaskStatus, Thread,
15    UpdateThreadRequest,
16};
17
18/// Inputs to [`TaskStore::create_task`]. The schema records only
19/// (`remote`, `inner_task_id`, `ended_at`, `invocation`) for the
20/// Invocation half — `remote` is the fast filter, the typed Executor +
21/// RunnerConfig live inside `invocation`. Adding a new runner kind
22/// (k8s, fly, …) does NOT require a schema migration; ship a new
23/// [`RunnerInitializer`] keyed by [`RunnerConfig::kind`].
24///
25/// Use [`CreateTaskInput::local`] for a local-executor task; chain
26/// [`CreateTaskInput::with_remote`] to flip `remote` to true and set
27/// `inner_task_id` together.
28#[derive(Debug, Clone)]
29pub struct CreateTaskInput {
30    pub thread_id: String,
31    pub task_id: Option<String>,
32    pub status: Option<TaskStatus>,
33    pub parent_task_id: Option<String>,
34    /// `true` when another orchestrator runs the loop. Equivalent to
35    /// `Executor::Remote { runner }` in the in-memory `Invocation`. The
36    /// runner kind/config is NOT denormalized into the schema — it
37    /// lives only in the `invocation` blob, dispatched at runtime via
38    /// the `RunnerInitializer` registry.
39    pub remote: bool,
40    /// task_id on the inner orchestrator. Must be `None` when
41    /// `remote == false` (DB CHECK enforces). May be `None`
42    /// transiently for remote rows — between row insert and the
43    /// runner assigning its inner id.
44    pub inner_task_id: Option<String>,
45    /// Serialized [`Invocation`](crate::invocation::Invocation). The
46    /// canonical record of what was requested, including the typed
47    /// `Executor` and any `RunnerConfig`. Stored as JSONB in Pg /
48    /// TEXT in sqlite. Default is `{}` until invoke() is wired.
49    pub invocation: serde_json::Value,
50}
51
52impl CreateTaskInput {
53    /// Local-executor task. `task_id` / `status` / `parent_task_id` /
54    /// `invocation` are chained via the `with_*` builders.
55    pub fn local(thread_id: impl Into<String>) -> Self {
56        Self {
57            thread_id: thread_id.into(),
58            task_id: None,
59            status: None,
60            parent_task_id: None,
61            remote: false,
62            inner_task_id: None,
63            invocation: serde_json::Value::Object(Default::default()),
64        }
65    }
66
67    pub fn with_id(mut self, task_id: impl Into<String>) -> Self {
68        self.task_id = Some(task_id.into());
69        self
70    }
71
72    pub fn with_status(mut self, status: TaskStatus) -> Self {
73        self.status = Some(status);
74        self
75    }
76
77    pub fn with_parent(mut self, parent_task_id: impl Into<String>) -> Self {
78        self.parent_task_id = Some(parent_task_id.into());
79        self
80    }
81
82    pub fn with_invocation(mut self, invocation: serde_json::Value) -> Self {
83        self.invocation = invocation;
84        self
85    }
86
87    /// Marks the task as remote-executed and sets the inner task id
88    /// the runner has assigned. The runner kind + its private config
89    /// live in `invocation` (typed `Executor::Remote { runner }`).
90    pub fn with_remote(mut self, inner_task_id: impl Into<String>) -> Self {
91        self.remote = true;
92        self.inner_task_id = Some(inner_task_id.into());
93        self
94    }
95}
96
97// Redis and PostgreSQL stores moved to distri-stores crate
98
99/// Filter for listing threads
100#[derive(Debug, Clone, Default, Serialize, Deserialize, ToSchema, JsonSchema)]
101pub struct ThreadListFilter {
102    /// Filter by agent ID
103    pub agent_id: Option<String>,
104    /// Filter by external ID (for integration with external systems)
105    pub external_id: Option<String>,
106    /// Filter by thread attributes (JSON matching)
107    #[serde(skip_serializing_if = "Option::is_none")]
108    pub attributes: Option<serde_json::Value>,
109    /// Full-text search across title and last_message
110    pub search: Option<String>,
111    /// Filter threads updated after this time
112    pub from_date: Option<DateTime<Utc>>,
113    /// Filter threads updated before this time
114    pub to_date: Option<DateTime<Utc>>,
115    /// Filter by tags (array of tag strings to match)
116    pub tags: Option<Vec<String>>,
117}
118
119/// Paginated response for thread listing
120#[derive(Debug, Clone, Serialize, Deserialize, ToSchema, JsonSchema)]
121pub struct ThreadListResponse {
122    pub threads: Vec<crate::ThreadSummary>,
123    pub total: i64,
124    pub page: u32,
125    pub page_size: u32,
126}
127
128/// Agent usage information for sorting agents by thread count
129#[derive(Debug, Clone, Serialize, Deserialize, ToSchema, JsonSchema)]
130pub struct AgentUsageInfo {
131    pub agent_id: String,
132    pub agent_name: String,
133    pub thread_count: i64,
134}
135
136/// Initialized store collection
137#[derive(Clone)]
138pub struct InitializedStores {
139    pub session_store: Arc<dyn SessionStore>,
140    pub agent_store: Arc<dyn AgentStore>,
141    pub task_store: Arc<dyn TaskStore>,
142    pub thread_store: Arc<dyn ThreadStore>,
143    pub tool_auth_store: Arc<dyn ToolAuthStore>,
144    pub scratchpad_store: Arc<dyn ScratchpadStore>,
145    pub memory_store: Option<Arc<dyn MemoryStore>>,
146    pub crawl_store: Option<Arc<dyn CrawlStore>>,
147    pub external_tool_calls_store: Arc<dyn ExternalToolCallsStore>,
148    pub prompt_template_store: Option<Arc<dyn PromptTemplateStore>>,
149    pub secret_store: Option<Arc<dyn SecretStore>>,
150    pub skill_store: Option<Arc<dyn SkillStore>>,
151    pub connection_store: Option<Arc<dyn ConnectionStore>>,
152    pub connection_token_store: Option<Arc<dyn ConnectionTokenStore>>,
153    pub provider_registry: Option<Arc<dyn crate::auth::ProviderRegistry>>,
154    pub span_store: Option<Arc<dyn SpanStore>>,
155    pub note_store: Option<Arc<dyn NoteStore>>,
156    /// Provider settings store (`/v1/providers` routes). `None` for the
157    /// multi-tenant cloud, which registers a workspace-scoped `ProviderStore`
158    /// separately rather than through `InitializedStores`.
159    pub provider_store: Option<Arc<dyn ProviderStore>>,
160}
161impl InitializedStores {
162    pub fn set_tool_auth_store(&mut self, tool_auth_store: Arc<dyn ToolAuthStore>) {
163        self.tool_auth_store = tool_auth_store;
164    }
165
166    pub fn set_external_tool_calls_store(mut self, store: Arc<dyn ExternalToolCallsStore>) {
167        self.external_tool_calls_store = store;
168    }
169
170    pub fn set_session_store(&mut self, session_store: Arc<dyn SessionStore>) {
171        self.session_store = session_store;
172    }
173
174    pub fn set_agent_store(&mut self, agent_store: Arc<dyn AgentStore>) {
175        self.agent_store = agent_store;
176    }
177
178    pub fn with_task_store(&mut self, task_store: Arc<dyn TaskStore>) {
179        self.task_store = task_store;
180    }
181
182    pub fn with_thread_store(&mut self, thread_store: Arc<dyn ThreadStore>) {
183        self.thread_store = thread_store;
184    }
185
186    pub fn with_scratchpad_store(&mut self, scratchpad_store: Arc<dyn ScratchpadStore>) {
187        self.scratchpad_store = scratchpad_store;
188    }
189}
190
191impl std::fmt::Debug for InitializedStores {
192    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
193        f.debug_struct("InitializedStores").finish()
194    }
195}
196
197#[derive(Debug, Serialize, Deserialize, Clone, ToSchema, JsonSchema)]
198pub struct SessionSummary {
199    pub session_id: String,
200    pub keys: Vec<String>,
201    pub key_count: usize,
202    pub updated_at: Option<DateTime<Utc>>,
203}
204
205// SessionStore trait - manages current conversation thread/run
206#[async_trait::async_trait]
207pub trait SessionStore: Send + Sync + std::fmt::Debug {
208    async fn clear_session(&self, namespace: &str) -> anyhow::Result<()>;
209
210    async fn set_value(&self, namespace: &str, key: &str, value: &Value) -> anyhow::Result<()>;
211
212    async fn set_value_with_expiry(
213        &self,
214        namespace: &str,
215        key: &str,
216        value: &Value,
217        expiry: Option<chrono::DateTime<chrono::Utc>>,
218    ) -> anyhow::Result<()>;
219
220    async fn get_value(&self, namespace: &str, key: &str) -> anyhow::Result<Option<Value>>;
221
222    async fn delete_value(&self, namespace: &str, key: &str) -> anyhow::Result<()>;
223
224    async fn get_all_values(&self, namespace: &str) -> anyhow::Result<HashMap<String, Value>>;
225
226    async fn list_sessions(
227        &self,
228        namespace: Option<&str>,
229        limit: Option<usize>,
230        offset: Option<usize>,
231    ) -> anyhow::Result<Vec<SessionSummary>>;
232}
233#[async_trait::async_trait]
234pub trait SessionStoreExt: SessionStore {
235    async fn set<T: Serialize + Sync>(
236        &self,
237        namespace: &str,
238        key: &str,
239        value: &T,
240    ) -> anyhow::Result<()> {
241        self.set_value(namespace, key, &serde_json::to_value(value)?)
242            .await
243    }
244    async fn set_with_expiry<T: Serialize + Sync>(
245        &self,
246        namespace: &str,
247        key: &str,
248        value: &T,
249        expiry: Option<chrono::DateTime<chrono::Utc>>,
250    ) -> anyhow::Result<()> {
251        self.set_value_with_expiry(namespace, key, &serde_json::to_value(value)?, expiry)
252            .await
253    }
254    async fn get<T: DeserializeOwned + Sync>(
255        &self,
256        namespace: &str,
257        key: &str,
258    ) -> anyhow::Result<Option<T>> {
259        match self.get_value(namespace, key).await? {
260            Some(b) => Ok(Some(serde_json::from_value(b)?)),
261            None => Ok(None),
262        }
263    }
264}
265impl<T: SessionStore + ?Sized> SessionStoreExt for T {}
266
267// Higher-level MemoryStore trait - manages cross-session permanent memory using user_id
268#[async_trait::async_trait]
269pub trait MemoryStore: Send + Sync {
270    /// Store permanent memory from a session for cross-session access
271    async fn store_memory(
272        &self,
273        user_id: &str,
274        session_memory: SessionMemory,
275    ) -> anyhow::Result<()>;
276
277    /// Search for relevant memories across sessions for a user
278    async fn search_memories(
279        &self,
280        user_id: &str,
281        query: &str,
282        limit: Option<usize>,
283    ) -> anyhow::Result<Vec<String>>;
284
285    /// Get all permanent memories for a user
286    async fn get_user_memories(&self, user_id: &str) -> anyhow::Result<Vec<String>>;
287
288    /// Clear all memories for a user
289    async fn clear_user_memories(&self, user_id: &str) -> anyhow::Result<()>;
290}
291
292#[derive(Debug, Clone)]
293pub struct SessionMemory {
294    pub agent_id: String,
295    pub thread_id: String,
296    pub session_summary: String,
297    pub key_insights: Vec<String>,
298    pub important_facts: Vec<String>,
299    pub timestamp: chrono::DateTime<chrono::Utc>,
300}
301#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, ToSchema, JsonSchema)]
302#[serde(tag = "type", rename_all = "snake_case")]
303pub enum FilterMessageType {
304    Events,
305    Messages,
306    Artifacts,
307}
308
309#[derive(Debug, Clone, Serialize, Deserialize, ToSchema, JsonSchema)]
310pub struct MessageFilter {
311    pub filter: Option<Vec<FilterMessageType>>,
312    pub limit: Option<usize>,
313    pub offset: Option<usize>,
314}
315
316// Task Store trait for A2A task management
317#[async_trait]
318pub trait TaskStore: Send + Sync {
319    /// Build (but do not persist) a Task row from a [`CreateTaskInput`]. The
320    /// returned struct is what the implementations will hand back from
321    /// `create_task` once the row has been inserted.
322    fn init_task(&self, input: &CreateTaskInput) -> Task {
323        let task_id = input
324            .task_id
325            .clone()
326            .unwrap_or_else(|| Uuid::new_v4().to_string());
327        Task {
328            id: task_id,
329            status: input.status.clone().unwrap_or(TaskStatus::Pending),
330            created_at: chrono::Utc::now().timestamp_millis(),
331            updated_at: chrono::Utc::now().timestamp_millis(),
332            thread_id: input.thread_id.clone(),
333            parent_task_id: input.parent_task_id.clone(),
334        }
335    }
336
337    async fn get_or_create_task(
338        &self,
339        thread_id: &str,
340        task_id: &str,
341    ) -> Result<(), anyhow::Error> {
342        match self.get_task(task_id).await? {
343            Some(task) => task,
344            None => {
345                self.create_task(
346                    CreateTaskInput::local(thread_id)
347                        .with_id(task_id)
348                        .with_status(TaskStatus::Running),
349                )
350                .await?
351            }
352        };
353
354        Ok(())
355    }
356
357    async fn create_task(&self, input: CreateTaskInput) -> anyhow::Result<Task>;
358    async fn get_task(&self, task_id: &str) -> anyhow::Result<Option<Task>>;
359    async fn update_task_status(&self, task_id: &str, status: TaskStatus) -> anyhow::Result<()>;
360    async fn add_event_to_task(&self, task_id: &str, event: AgentEvent) -> anyhow::Result<()>;
361    async fn add_message_to_task(&self, task_id: &str, message: &Message) -> anyhow::Result<()>;
362    async fn cancel_task(&self, task_id: &str) -> anyhow::Result<Task>;
363
364    /// Cancel `root_task_id` and every task whose `parent_task_id` chain
365    /// leads back to it, in one transaction.
366    ///
367    /// Idempotent on terminal rows: tasks already in `Completed`, `Failed`,
368    /// or `Canceled` are left untouched. The returned `Vec<Task>` contains
369    /// the rows that were actually transitioned to `Canceled` — the caller
370    /// uses this to publish corresponding cancel events on the broadcaster
371    /// so live in-process loops can stop.
372    ///
373    /// The cascade is implemented via a recursive CTE on the `parent_task_id`
374    /// edge; the `idx_tasks_parent_id` index keeps the walk cheap.
375    async fn cancel_task_cascade(&self, root_task_id: &str) -> anyhow::Result<Vec<Task>>;
376
377    /// Read-only walk of the parent_task_id graph rooted at `root_task_id`,
378    /// returning the root + every descendant. Used by the `list_my_tasks`
379    /// supervisor tool when scoped to a sub-tree, and by `wait_task` to
380    /// wait on the whole sub-tree of a Detached invocation.
381    ///
382    /// Order is breadth-first by descendant depth (root first); within a
383    /// level the order is implementation-defined.
384    async fn list_descendant_tasks(&self, root_task_id: &str) -> anyhow::Result<Vec<Task>>;
385
386    /// All non-terminal tasks. When `thread_id` is `Some`, scopes to that
387    /// thread (inputs of `list_my_tasks` from a thread-scoped supervisor);
388    /// otherwise returns every running task visible to the caller (cloud
389    /// tenant isolation still applies).
390    ///
391    /// "Running" here means the schema status `running` — tasks in `pending`,
392    /// `input_required`, or terminal states are excluded. The partial index
393    /// `idx_tasks_running` covers this query.
394    async fn list_running_tasks(&self, thread_id: Option<&str>) -> anyhow::Result<Vec<Task>>;
395    async fn list_tasks(&self, thread_id: Option<&str>) -> anyhow::Result<Vec<Task>>;
396
397    async fn get_history(
398        &self,
399        thread_id: &str,
400        filter: Option<MessageFilter>,
401    ) -> anyhow::Result<Vec<(Task, Vec<TaskMessage>)>>;
402
403    async fn update_parent_task(
404        &self,
405        task_id: &str,
406        parent_task_id: Option<&str>,
407    ) -> anyhow::Result<()>;
408}
409
410// Thread Store trait for thread management
411#[async_trait]
412pub trait ThreadStore: Send + Sync {
413    fn as_any(&self) -> &dyn std::any::Any;
414    async fn create_thread(&self, request: CreateThreadRequest) -> anyhow::Result<Thread>;
415    async fn get_thread(&self, thread_id: &str) -> anyhow::Result<Option<Thread>>;
416    async fn update_thread(
417        &self,
418        thread_id: &str,
419        request: UpdateThreadRequest,
420    ) -> anyhow::Result<Thread>;
421    async fn delete_thread(&self, thread_id: &str) -> anyhow::Result<()>;
422
423    /// List threads with pagination and filtering
424    /// Returns a paginated response with total count
425    async fn list_threads(
426        &self,
427        filter: &ThreadListFilter,
428        limit: Option<u32>,
429        offset: Option<u32>,
430    ) -> anyhow::Result<ThreadListResponse>;
431
432    async fn update_thread_with_message(
433        &self,
434        thread_id: &str,
435        message: &str,
436    ) -> anyhow::Result<()>;
437
438    /// Persist the latest `ContextBudget` snapshot (as a pre-serialized JSON
439    /// value) on the thread row. Written by the orchestrator's task relay on
440    /// every `ContextBudgetUpdate` event so non-live surfaces can render the
441    /// breakdown. Default impl is a no-op so tests / in-memory stores don't
442    /// need to care.
443    async fn update_last_context_budget(
444        &self,
445        thread_id: &str,
446        budget: Option<serde_json::Value>,
447    ) -> anyhow::Result<()> {
448        let _ = (thread_id, budget);
449        Ok(())
450    }
451
452    /// Get aggregated home statistics
453    async fn get_home_stats(&self) -> anyhow::Result<HomeStats>;
454
455    /// Get agents sorted by thread count (most active first)
456    /// Includes all registered agents (even those with 0 threads).
457    /// Optionally filters by name using a search string.
458    async fn get_agents_by_usage(
459        &self,
460        search: Option<&str>,
461    ) -> anyhow::Result<Vec<AgentUsageInfo>>;
462
463    /// Get a map of agent name -> stats for all agents with activity
464    async fn get_agent_stats_map(
465        &self,
466    ) -> anyhow::Result<std::collections::HashMap<String, AgentStatsInfo>>;
467
468    // ========== Message Read Status Methods ==========
469
470    /// Mark a message as read by the current user
471    async fn mark_message_read(
472        &self,
473        thread_id: &str,
474        message_id: &str,
475    ) -> anyhow::Result<MessageReadStatus>;
476
477    /// Get read status for a specific message
478    async fn get_message_read_status(
479        &self,
480        thread_id: &str,
481        message_id: &str,
482    ) -> anyhow::Result<Option<MessageReadStatus>>;
483
484    /// Get read status for all messages in a thread for the current user
485    async fn get_thread_read_status(
486        &self,
487        thread_id: &str,
488    ) -> anyhow::Result<Vec<MessageReadStatus>>;
489
490    // ========== Message Voting Methods ==========
491
492    /// Vote on a message (upvote or downvote)
493    /// For downvotes, a comment is required
494    async fn vote_message(&self, request: VoteMessageRequest) -> anyhow::Result<MessageVote>;
495
496    /// Remove a vote from a message
497    async fn remove_vote(&self, thread_id: &str, message_id: &str) -> anyhow::Result<()>;
498
499    /// Get the current user's vote on a message
500    async fn get_user_vote(
501        &self,
502        thread_id: &str,
503        message_id: &str,
504    ) -> anyhow::Result<Option<MessageVote>>;
505
506    /// Get vote summary for a message (counts + current user's vote)
507    async fn get_message_vote_summary(
508        &self,
509        thread_id: &str,
510        message_id: &str,
511    ) -> anyhow::Result<MessageVoteSummary>;
512
513    /// Get all votes for a message (admin/analytics use)
514    async fn get_message_votes(
515        &self,
516        thread_id: &str,
517        message_id: &str,
518    ) -> anyhow::Result<Vec<MessageVote>>;
519}
520
521/// Home statistics for dashboard
522#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, ToSchema, JsonSchema)]
523pub struct HomeStats {
524    pub total_agents: i64,
525    pub total_threads: i64,
526    pub total_messages: i64,
527    pub avg_run_time_ms: Option<f64>,
528    // Cloud-specific fields (optional)
529    #[serde(skip_serializing_if = "Option::is_none")]
530    pub total_owned_agents: Option<i64>,
531    #[serde(skip_serializing_if = "Option::is_none")]
532    pub total_accessible_agents: Option<i64>,
533    #[serde(skip_serializing_if = "Option::is_none")]
534    pub most_active_agent: Option<MostActiveAgent>,
535    #[serde(skip_serializing_if = "Option::is_none")]
536    pub latest_threads: Option<Vec<LatestThreadInfo>>,
537    /// Recently used agents (last 10 by most recent thread activity)
538    #[serde(skip_serializing_if = "Option::is_none")]
539    pub recently_used_agents: Option<Vec<RecentlyUsedAgent>>,
540    /// Custom metrics that can be displayed in the stats overview
541    /// Key is the metric name (e.g., "usage"), value is the metric data
542    #[serde(skip_serializing_if = "Option::is_none")]
543    pub custom_metrics: Option<std::collections::HashMap<String, CustomMetric>>,
544}
545
546/// A custom metric for display in the stats overview
547#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, ToSchema, JsonSchema)]
548pub struct CustomMetric {
549    /// Display label (e.g., "Monthly Calls")
550    pub label: String,
551    /// Current value as a string (formatted)
552    pub value: String,
553    /// Optional helper text below the value
554    #[serde(skip_serializing_if = "Option::is_none")]
555    pub helper: Option<String>,
556    /// Optional limit (for progress display)
557    #[serde(skip_serializing_if = "Option::is_none")]
558    pub limit: Option<String>,
559    /// Optional raw numeric value for calculations
560    #[serde(skip_serializing_if = "Option::is_none")]
561    pub raw_value: Option<i64>,
562    /// Optional raw limit for calculations
563    #[serde(skip_serializing_if = "Option::is_none")]
564    pub raw_limit: Option<i64>,
565}
566
567#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, ToSchema, JsonSchema)]
568pub struct MostActiveAgent {
569    pub id: String,
570    pub name: String,
571    pub thread_count: i64,
572}
573
574/// Agent that was recently used (based on thread activity)
575#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, ToSchema, JsonSchema)]
576pub struct RecentlyUsedAgent {
577    pub id: String,
578    pub name: String,
579    pub description: Option<String>,
580    pub last_used_at: chrono::DateTime<chrono::Utc>,
581}
582
583#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, ToSchema, JsonSchema)]
584pub struct LatestThreadInfo {
585    pub id: String,
586    pub title: String,
587    pub agent_id: String,
588    pub agent_name: String,
589    pub updated_at: chrono::DateTime<chrono::Utc>,
590}
591
592/// Agent statistics for display
593#[derive(Debug, Clone, Default, serde::Serialize, serde::Deserialize, ToSchema, JsonSchema)]
594pub struct AgentStatsInfo {
595    pub thread_count: i64,
596    pub sub_agent_usage_count: i64,
597    pub last_used_at: Option<chrono::DateTime<chrono::Utc>>,
598}
599
600#[async_trait]
601pub trait AgentStore: Send + Sync {
602    async fn list(
603        &self,
604        cursor: Option<String>,
605        limit: Option<usize>,
606    ) -> (Vec<crate::configuration::AgentConfig>, Option<String>);
607
608    async fn get(&self, name: &str) -> Option<crate::configuration::AgentConfig>;
609    async fn register(&self, config: crate::configuration::AgentConfig) -> anyhow::Result<()>;
610    /// Update an existing agent with new definition
611    async fn update(&self, config: crate::configuration::AgentConfig) -> anyhow::Result<()>;
612
613    async fn clear(&self) -> anyhow::Result<()>;
614
615    /// Delete an agent by name or ID
616    async fn delete(&self, id: &str) -> anyhow::Result<()>;
617
618    /// Get an agent with cloud-specific metadata (id, published, is_owner, etc.)
619    /// Default impl returns empty metadata — override in cloud stores.
620    async fn get_with_cloud_metadata(
621        &self,
622        name: &str,
623    ) -> Option<(
624        crate::configuration::AgentConfig,
625        crate::configuration::AgentCloudMetadata,
626    )> {
627        self.get(name)
628            .await
629            .map(|c| (c, crate::configuration::AgentCloudMetadata::default()))
630    }
631
632    /// List agents with cloud-specific metadata.
633    /// Default impl returns empty metadata — override in cloud stores.
634    async fn list_with_cloud_metadata(
635        &self,
636        cursor: Option<String>,
637        limit: Option<usize>,
638    ) -> (
639        Vec<(
640            crate::configuration::AgentConfig,
641            crate::configuration::AgentCloudMetadata,
642        )>,
643        Option<String>,
644    ) {
645        let (configs, cursor) = self.list(cursor, limit).await;
646        (
647            configs
648                .into_iter()
649                .map(|c| (c, crate::configuration::AgentCloudMetadata::default()))
650                .collect(),
651            cursor,
652        )
653    }
654}
655
656/// Store for managing scratchpad entries across conversations
657#[async_trait::async_trait]
658pub trait ScratchpadStore: Send + Sync + std::fmt::Debug {
659    /// Add a scratchpad entry for a specific thread
660    async fn add_entry(
661        &self,
662        thread_id: &str,
663        entry: ScratchpadEntry,
664    ) -> Result<(), crate::AgentError>;
665
666    /// Clear all scratchpad entries for a thread
667    async fn clear_entries(&self, thread_id: &str) -> Result<(), crate::AgentError>;
668
669    /// Get entries for a specific task within a thread
670    async fn get_entries(
671        &self,
672        thread_id: &str,
673        task_id: &str,
674        limit: Option<usize>,
675    ) -> Result<Vec<ScratchpadEntry>, crate::AgentError>;
676
677    async fn get_all_entries(
678        &self,
679        thread_id: &str,
680        limit: Option<usize>,
681    ) -> Result<Vec<ScratchpadEntry>, crate::AgentError>;
682}
683
684/// Web crawl result data
685#[derive(Debug, Clone, Serialize, Deserialize, ToSchema, JsonSchema)]
686pub struct CrawlResult {
687    pub id: String,
688    pub url: String,
689    pub title: Option<String>,
690    pub content: String,
691    pub html: Option<String>,
692    pub metadata: serde_json::Value,
693    pub links: Vec<String>,
694    pub images: Vec<String>,
695    pub status_code: Option<u16>,
696    pub crawled_at: chrono::DateTime<chrono::Utc>,
697    pub processing_time_ms: Option<u64>,
698}
699
700/// Store for managing web crawl results
701#[async_trait]
702pub trait CrawlStore: Send + Sync {
703    /// Store a crawl result
704    async fn store_crawl_result(&self, result: CrawlResult) -> anyhow::Result<String>;
705
706    /// Get a crawl result by ID
707    async fn get_crawl_result(&self, id: &str) -> anyhow::Result<Option<CrawlResult>>;
708
709    /// Get crawl results for a specific URL
710    async fn get_crawl_results_by_url(&self, url: &str) -> anyhow::Result<Vec<CrawlResult>>;
711
712    /// Get recent crawl results (within time limit)
713    async fn get_recent_crawl_results(
714        &self,
715        limit: Option<usize>,
716        since: Option<chrono::DateTime<chrono::Utc>>,
717    ) -> anyhow::Result<Vec<CrawlResult>>;
718
719    /// Check if URL was recently crawled (within cache duration)
720    async fn is_url_recently_crawled(
721        &self,
722        url: &str,
723        cache_duration: chrono::Duration,
724    ) -> anyhow::Result<Option<CrawlResult>>;
725
726    /// Delete crawl result
727    async fn delete_crawl_result(&self, id: &str) -> anyhow::Result<()>;
728
729    /// Clear all crawl results older than specified date
730    async fn cleanup_old_results(
731        &self,
732        before: chrono::DateTime<chrono::Utc>,
733    ) -> anyhow::Result<usize>;
734}
735
736// ========== Message Read & Voting Types ==========
737
738/// Vote type for message feedback
739#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq, ToSchema, JsonSchema)]
740#[serde(rename_all = "lowercase")]
741pub enum VoteType {
742    Upvote,
743    Downvote,
744}
745
746/// Record of a message being read
747#[derive(Debug, Clone, Serialize, Deserialize, ToSchema, JsonSchema)]
748pub struct MessageReadStatus {
749    pub thread_id: String,
750    pub message_id: String,
751    pub user_id: String,
752    pub read_at: chrono::DateTime<chrono::Utc>,
753}
754
755/// Request to mark a message as read
756#[derive(Debug, Clone, Serialize, Deserialize, ToSchema, JsonSchema)]
757pub struct MarkMessageReadRequest {
758    pub thread_id: String,
759    pub message_id: String,
760}
761
762/// A vote on a message with optional feedback comment
763#[derive(Debug, Clone, Serialize, Deserialize, ToSchema, JsonSchema)]
764pub struct MessageVote {
765    pub id: String,
766    pub thread_id: String,
767    pub message_id: String,
768    pub user_id: String,
769    pub vote_type: VoteType,
770    /// Comment is required for downvotes, optional for upvotes
771    pub comment: Option<String>,
772    pub created_at: chrono::DateTime<chrono::Utc>,
773    pub updated_at: chrono::DateTime<chrono::Utc>,
774}
775
776/// Request to vote on a message
777#[derive(Debug, Clone, Serialize, Deserialize, ToSchema, JsonSchema)]
778#[schema(example = json!({"vote_type": "up"}))]
779pub struct VoteMessageRequest {
780    pub thread_id: String,
781    pub message_id: String,
782    pub vote_type: VoteType,
783    /// Required for downvotes
784    pub comment: Option<String>,
785}
786
787/// Summary of votes for a message
788#[derive(Debug, Clone, Serialize, Deserialize, Default, ToSchema, JsonSchema)]
789pub struct MessageVoteSummary {
790    pub message_id: String,
791    pub upvotes: i64,
792    pub downvotes: i64,
793    /// Current user's vote on this message, if any
794    pub user_vote: Option<VoteType>,
795}
796
797/// Store for managing external tool call completions using oneshot channels
798#[async_trait]
799pub trait ExternalToolCallsStore: Send + Sync + std::fmt::Debug {
800    /// Register a new external tool call session and return a receiver for the response
801    async fn register_external_tool_call(
802        &self,
803        session_id: &str,
804    ) -> anyhow::Result<oneshot::Receiver<ToolResponse>>;
805
806    /// Complete an external tool call by sending the response
807    async fn complete_external_tool_call(
808        &self,
809        session_id: &str,
810        tool_response: ToolResponse,
811    ) -> anyhow::Result<()>;
812
813    /// Remove a session (cleanup)
814    async fn remove_tool_call(&self, session_id: &str) -> anyhow::Result<()>;
815
816    /// List all pending sessions (for debugging)
817    async fn list_pending_tool_calls(&self) -> anyhow::Result<Vec<String>>;
818}
819
820// ========== Prompt Template Store ==========
821
822#[derive(Debug, Clone, Serialize, Deserialize, ToSchema, JsonSchema)]
823pub struct PromptTemplateRecord {
824    pub id: String,
825    pub name: String,
826    pub template: String,
827    pub description: Option<String>,
828    pub version: Option<String>,
829    pub is_system: bool,
830    pub created_at: chrono::DateTime<chrono::Utc>,
831    pub updated_at: chrono::DateTime<chrono::Utc>,
832}
833
834#[derive(Debug, Clone, Serialize, Deserialize, ToSchema, JsonSchema)]
835#[schema(example = json!({"name": "greeting", "content": "Hello {{name}}, welcome to {{service}}!", "description": "A greeting template"}))]
836pub struct NewPromptTemplate {
837    pub name: String,
838    pub template: String,
839    pub description: Option<String>,
840    pub version: Option<String>,
841    #[serde(default)]
842    pub is_system: bool,
843}
844
845#[derive(Debug, Clone, Serialize, Deserialize, ToSchema, JsonSchema)]
846pub struct UpdatePromptTemplate {
847    pub name: String,
848    pub template: String,
849    pub description: Option<String>,
850}
851
852#[async_trait]
853pub trait PromptTemplateStore: Send + Sync {
854    async fn list(&self) -> anyhow::Result<Vec<PromptTemplateRecord>>;
855    async fn get(&self, id: &str) -> anyhow::Result<Option<PromptTemplateRecord>>;
856    /// Fetch multiple templates by name in a single query.
857    async fn get_by_names(&self, names: &[String]) -> anyhow::Result<Vec<PromptTemplateRecord>>;
858    async fn create(&self, template: NewPromptTemplate) -> anyhow::Result<PromptTemplateRecord>;
859    async fn update(
860        &self,
861        id: &str,
862        update: UpdatePromptTemplate,
863    ) -> anyhow::Result<PromptTemplateRecord>;
864    async fn delete(&self, id: &str) -> anyhow::Result<()>;
865    async fn clone_template(&self, id: &str) -> anyhow::Result<PromptTemplateRecord>;
866    async fn sync_system_templates(&self, templates: Vec<NewPromptTemplate>) -> anyhow::Result<()>;
867}
868
869// ========== Secret Store ==========
870
871#[derive(Debug, Clone, Serialize, Deserialize, ToSchema, JsonSchema)]
872pub struct SecretRecord {
873    pub id: String,
874    pub key: String,
875    pub value: String,
876    pub created_at: chrono::DateTime<chrono::Utc>,
877    pub updated_at: chrono::DateTime<chrono::Utc>,
878}
879
880#[derive(Debug, Clone, Serialize, Deserialize, ToSchema, JsonSchema)]
881#[schema(example = json!({"key": "OPENAI_API_KEY", "value": "sk-..."}))]
882pub struct NewSecret {
883    pub key: String,
884    pub value: String,
885}
886
887#[async_trait]
888pub trait SecretStore: Send + Sync {
889    async fn list(&self) -> anyhow::Result<Vec<SecretRecord>>;
890    async fn get(&self, key: &str) -> anyhow::Result<Option<SecretRecord>>;
891    async fn create(&self, secret: NewSecret) -> anyhow::Result<SecretRecord>;
892    async fn update(&self, key: &str, value: &str) -> anyhow::Result<SecretRecord>;
893    async fn delete(&self, key: &str) -> anyhow::Result<()>;
894}
895
896// ========== Provider Store ==========
897
898#[derive(Debug, Clone, Serialize, Deserialize, ToSchema, JsonSchema)]
899pub struct CustomProviderConfig {
900    pub id: String,
901    pub name: String,
902    pub base_url: String,
903    #[serde(default, skip_serializing_if = "Option::is_none")]
904    pub project_id: Option<String>,
905}
906
907#[derive(Debug, Clone, Serialize, Deserialize, ToSchema, JsonSchema)]
908pub struct CustomModelEntry {
909    pub provider: String,
910    pub model: String,
911    /// "completion" (default), "tts", or "stt"
912    #[serde(default = "default_completion")]
913    pub capability: String,
914}
915
916fn default_completion() -> String {
917    "completion".to_string()
918}
919
920/// A custom connection provider (OAuth integration) stored in workspace settings.
921#[derive(Debug, Clone, Serialize, Deserialize, ToSchema, JsonSchema)]
922pub struct ConnectionProviderConfig {
923    /// Unique identifier (e.g., "linear", "figma", "custom_crm")
924    pub id: String,
925    /// Display name
926    pub name: String,
927    /// OAuth2 authorization URL
928    pub authorization_url: String,
929    /// OAuth2 token URL
930    pub token_url: String,
931    /// Optional refresh URL (defaults to token_url)
932    #[serde(default, skip_serializing_if = "Option::is_none")]
933    pub refresh_url: Option<String>,
934    /// Scopes the provider supports
935    #[serde(default)]
936    pub scopes_supported: Vec<String>,
937    /// Default scopes to request
938    #[serde(default)]
939    pub default_scopes: Vec<String>,
940    /// Friendly scope name → full scope string mappings
941    #[serde(default)]
942    pub scope_mappings: std::collections::HashMap<String, String>,
943}
944
945/// Request payload for upserting a provider configuration.
946#[derive(Debug, Clone, Serialize, Deserialize, ToSchema, JsonSchema)]
947pub struct UpsertProviderRequest {
948    pub provider_id: String,
949    #[serde(default)]
950    pub secrets: std::collections::HashMap<String, String>,
951    #[serde(default)]
952    pub config: Option<CustomProviderConfig>,
953    #[serde(default)]
954    pub custom_models: Option<Vec<CustomModelEntry>>,
955    /// Default model in "provider/model" format. Empty string or null to clear.
956    #[serde(default)]
957    pub default_model: Option<String>,
958    /// Connection provider config (OAuth integration) to add/update.
959    #[serde(default)]
960    pub connection_provider: Option<ConnectionProviderConfig>,
961}
962
963/// Response after upserting a provider.
964#[derive(Debug, Clone, Serialize, Deserialize, ToSchema, JsonSchema)]
965pub struct UpsertProviderResponse {
966    pub provider_id: String,
967    pub secrets_saved: usize,
968    pub config_saved: bool,
969}
970
971/// Request to validate an already-configured provider — `POST
972/// /v1/providers/test`. Credentials are resolved from stored config
973/// server-side; nothing sensitive is sent over the wire.
974#[derive(Debug, Clone, Serialize, Deserialize, ToSchema, JsonSchema)]
975pub struct TestProviderRequest {
976    /// Built-in provider id (e.g. `azure_ai_foundry`) or a custom provider id.
977    pub provider_id: String,
978}
979
980/// A provider's resolved probe target — the OpenAI-compatible base URL and
981/// API key from stored config. Internal (store → route), not a wire type.
982#[derive(Debug, Clone)]
983pub struct ResolvedProviderEndpoint {
984    pub base_url: String,
985    pub api_key: String,
986}
987
988/// Result of a `POST /v1/providers/test` probe.
989#[derive(Debug, Clone, Serialize, Deserialize, ToSchema, JsonSchema)]
990pub struct TestProviderResponse {
991    /// True when the endpoint answered `GET /models` successfully.
992    pub ok: bool,
993    /// Model ids the endpoint reported, when reachable.
994    #[serde(default)]
995    pub models: Vec<String>,
996    /// Failure detail when `ok == false`.
997    #[serde(default, skip_serializing_if = "Option::is_none")]
998    pub error: Option<String>,
999}
1000
1001#[async_trait]
1002pub trait ProviderStore: Send + Sync {
1003    async fn upsert_provider(
1004        &self,
1005        req: UpsertProviderRequest,
1006    ) -> anyhow::Result<UpsertProviderResponse>;
1007
1008    async fn delete_provider(&self, provider_id: &str) -> anyhow::Result<()>;
1009
1010    async fn get_default_model(&self) -> anyhow::Result<Option<String>>;
1011
1012    /// Resolve a provider's probe target (base URL + API key) from stored
1013    /// config, for the `POST /v1/providers/test` validation endpoint.
1014    async fn resolve_provider_endpoint(
1015        &self,
1016        provider_id: &str,
1017    ) -> anyhow::Result<ResolvedProviderEndpoint>;
1018}
1019
1020/// Resolve a provider's `(base_url, api_key)` for the `/providers/test`
1021/// probe. Built-in providers hydrate from their canonical secret keys;
1022/// custom providers take `base_url` from `custom_providers` and the key
1023/// from `{PROVIDER_ID}_API_KEY`. Shared by the cloud and standalone stores.
1024pub async fn resolve_provider_test_endpoint(
1025    provider_id: &str,
1026    secret_store: &dyn SecretStore,
1027    custom_providers: &[CustomProviderConfig],
1028) -> anyhow::Result<ResolvedProviderEndpoint> {
1029    use crate::agent::ModelSettings;
1030    // Built-in provider: build its ModelProvider and hydrate from secrets.
1031    if let Ok(Some(mut ms)) = ModelSettings::from_provider_model_str(&format!("{provider_id}/_")) {
1032        ms.hydrate_creds(secret_store)
1033            .await
1034            .map_err(|e| anyhow::anyhow!(e))?;
1035        let (base_url, api_key) = ms.inner.provider.resolved_endpoint();
1036        let base_url = base_url.filter(|u| !u.trim().is_empty()).ok_or_else(|| {
1037            anyhow::anyhow!("provider '{provider_id}' has no endpoint configured")
1038        })?;
1039        return Ok(ResolvedProviderEndpoint {
1040            base_url,
1041            api_key: api_key.unwrap_or_default(),
1042        });
1043    }
1044    // Custom OpenAI-compatible provider.
1045    let cp = custom_providers
1046        .iter()
1047        .find(|p| p.id == provider_id)
1048        .ok_or_else(|| anyhow::anyhow!("unknown provider '{provider_id}'"))?;
1049    let api_key = secret_store
1050        .get(&format!("{}_API_KEY", provider_id.to_uppercase()))
1051        .await
1052        .map_err(|e| anyhow::anyhow!(e))?
1053        .map(|s| s.value)
1054        .unwrap_or_default();
1055    Ok(ResolvedProviderEndpoint {
1056        base_url: cp.base_url.clone(),
1057        api_key,
1058    })
1059}
1060
1061/// Provider-related settings for the single-tenant standalone server.
1062///
1063/// Persisted as the `config_json` of the one `server_settings` row. This
1064/// mirrors the provider-relevant subset of the cloud's per-workspace
1065/// `WorkspaceSettings` — the standalone server has no `workspaces` table,
1066/// so there is exactly one of these.
1067#[derive(Debug, Clone, Default, Serialize, Deserialize, ToSchema, JsonSchema)]
1068pub struct ServerSettings {
1069    /// Default model in `"provider/model"` format.
1070    #[serde(default, skip_serializing_if = "Option::is_none")]
1071    pub default_model: Option<String>,
1072    /// Custom (non-built-in) provider definitions.
1073    #[serde(default)]
1074    pub custom_providers: Vec<CustomProviderConfig>,
1075    /// Custom model entries, each keyed to a provider id.
1076    #[serde(default)]
1077    pub custom_models: Vec<CustomModelEntry>,
1078    /// Connection (OAuth) provider definitions.
1079    #[serde(default)]
1080    pub connection_providers: Vec<ConnectionProviderConfig>,
1081}
1082
1083// ========== Skill Store ==========
1084
1085/// How a skill is executed relative to the calling agent's context.
1086/// Mirrors the `context` field in claude-code's prompt command spec.
1087#[derive(Debug, Clone, Serialize, Deserialize, Default, PartialEq, ToSchema, JsonSchema)]
1088#[serde(rename_all = "lowercase")]
1089pub enum ContextExecutionType {
1090    /// Inject the full skill content into the current agent's context window.
1091    /// The calling agent incorporates it directly — no sub-agent spawned.
1092    #[default]
1093    Inline,
1094    /// Spawn an isolated child agent with the skill as its instruction set.
1095    /// The child runs with its own token budget and task record; its result
1096    /// is summarised and returned to the parent.
1097    Fork,
1098}
1099
1100impl std::fmt::Display for ContextExecutionType {
1101    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1102        match self {
1103            ContextExecutionType::Inline => write!(f, "inline"),
1104            ContextExecutionType::Fork => write!(f, "fork"),
1105        }
1106    }
1107}
1108
1109impl std::str::FromStr for ContextExecutionType {
1110    type Err = ();
1111    fn from_str(s: &str) -> Result<Self, Self::Err> {
1112        match s {
1113            "fork" => Ok(ContextExecutionType::Fork),
1114            _ => Ok(ContextExecutionType::Inline),
1115        }
1116    }
1117}
1118
1119/// Total token budget for all skill listings in the system prompt.
1120pub const SKILL_LISTING_BUDGET: usize = 2_000;
1121/// Max description chars per skill in system prompt listing.
1122pub const SKILL_DESCRIPTION_CAP: usize = 250;
1123/// Default max output tokens for a skill when not explicitly set.
1124pub const DEFAULT_SKILL_MAX_TOKENS: u32 = 8000;
1125
1126/// Parsed frontmatter from a SKILL.md file (agentskills.io spec).
1127///
1128/// Per the spec at https://agentskills.io/specification:
1129/// - `name` — required, lowercase a-z + hyphens, must match parent directory.
1130/// - `description` — required.
1131/// - `license` — optional license name or path.
1132/// - `compatibility` — optional environment requirements string.
1133/// - `metadata` — optional free-form key/value map (where distri-specific
1134///    knobs like `model`, `max_tokens`, `can_spawn_tasks` live).
1135/// - `allowed_tools` — optional pre-approved tools list (experimental).
1136///
1137/// All distri-specific runtime hints are read from `metadata` so the file
1138/// is portable to any agentskills.io-compliant client.
1139#[derive(Debug, Clone, Serialize, Deserialize, Default, ToSchema, JsonSchema)]
1140pub struct SkillFrontmatter {
1141    pub name: String,
1142    #[serde(default)]
1143    pub description: Option<String>,
1144    #[serde(default, skip_serializing_if = "Option::is_none")]
1145    pub license: Option<String>,
1146    #[serde(default, skip_serializing_if = "Option::is_none")]
1147    pub compatibility: Option<String>,
1148    #[serde(default, skip_serializing_if = "std::collections::HashMap::is_empty")]
1149    pub metadata: std::collections::HashMap<String, String>,
1150    /// Maps to `allowed-tools` on the wire (per agentskills.io spec).
1151    #[serde(
1152        default,
1153        rename = "allowed-tools",
1154        skip_serializing_if = "Option::is_none"
1155    )]
1156    pub allowed_tools: Option<String>,
1157}
1158
1159impl SkillFrontmatter {
1160    /// Distri-specific runtime hint stored in `metadata.model`.
1161    pub fn model(&self) -> Option<&str> {
1162        self.metadata.get("model").map(|s| s.as_str())
1163    }
1164
1165    /// Distri-specific runtime hint stored in `metadata.max_tokens`.
1166    pub fn max_tokens(&self) -> Option<u32> {
1167        self.metadata.get("max_tokens").and_then(|s| s.parse().ok())
1168    }
1169
1170    /// Distri-specific runtime hint stored in `metadata.can_spawn_tasks`.
1171    pub fn can_spawn_tasks(&self) -> bool {
1172        self.metadata
1173            .get("can_spawn_tasks")
1174            .map(|s| s == "true" || s == "yes")
1175            .unwrap_or(false)
1176    }
1177
1178    /// Distri-specific runtime hint stored as a comma- or space-separated list.
1179    pub fn tags(&self) -> Vec<String> {
1180        self.metadata
1181            .get("tags")
1182            .map(|s| {
1183                s.split(|c: char| c == ',' || c.is_whitespace())
1184                    .filter(|t| !t.is_empty())
1185                    .map(|t| t.trim().to_string())
1186                    .collect()
1187            })
1188            .unwrap_or_default()
1189    }
1190
1191    pub fn effective_max_tokens(&self) -> u32 {
1192        self.max_tokens().unwrap_or(DEFAULT_SKILL_MAX_TOKENS)
1193    }
1194
1195    pub fn as_listing_line(&self) -> String {
1196        let desc = self.description.as_deref().unwrap_or("No description");
1197        let desc_truncated = if desc.len() > SKILL_DESCRIPTION_CAP {
1198            format!("{}...", &desc[..SKILL_DESCRIPTION_CAP.min(desc.len())])
1199        } else {
1200            desc.to_string()
1201        };
1202        let mut meta = Vec::new();
1203        if let Some(model) = self.model() {
1204            meta.push(format!("model: {}", model));
1205        }
1206        if self.can_spawn_tasks() {
1207            meta.push("tasks: yes".to_string());
1208        }
1209        if meta.is_empty() {
1210            format!("- {}: {}", self.name, desc_truncated)
1211        } else {
1212            format!("- {}: {} ({})", self.name, desc_truncated, meta.join(", "))
1213        }
1214    }
1215}
1216
1217/// Format a list of skills for the system prompt, respecting a token budget.
1218pub fn format_skill_listing(skills: &[SkillFrontmatter], budget_tokens: usize) -> String {
1219    let budget_chars = budget_tokens * 4;
1220    let mut result = String::new();
1221    let mut remaining_chars = budget_chars;
1222    for skill in skills {
1223        let line = format!("{}\n", skill.as_listing_line());
1224        if line.len() > remaining_chars {
1225            let name_line = format!("- {}\n", skill.name);
1226            if name_line.len() <= remaining_chars {
1227                result.push_str(&name_line);
1228                remaining_chars -= name_line.len();
1229            } else {
1230                break;
1231            }
1232        } else {
1233            result.push_str(&line);
1234            remaining_chars -= line.len();
1235        }
1236    }
1237    result.trim_end().to_string()
1238}
1239
1240/// API response wrapper for skill list endpoints.
1241#[derive(Debug, Clone, Serialize, Deserialize, ToSchema, JsonSchema)]
1242pub struct SkillsListResponse {
1243    pub skills: Vec<SkillListItem>,
1244}
1245
1246/// Lighter skill record for list endpoints — no content or scripts.
1247/// Used by both distri-server (OSS) and distri-cloud.
1248///
1249/// Note: marketplace fields (`is_public`, `is_system`, `star_count`,
1250/// `clone_count`, `is_starred`) were removed. Skills are workspace-scoped;
1251/// public discovery happens through external registries.
1252#[derive(Debug, Clone, Serialize, Deserialize, ToSchema, JsonSchema)]
1253pub struct SkillListItem {
1254    pub id: String,
1255    #[serde(default)]
1256    pub workspace_slug: String,
1257    pub name: String,
1258    #[serde(default)]
1259    pub full_name: String,
1260    #[serde(default)]
1261    pub description: Option<String>,
1262    #[serde(default)]
1263    pub tags: Vec<String>,
1264    #[serde(default)]
1265    pub is_owner: bool,
1266    /// True when the skill belongs to the current workspace
1267    #[serde(default)]
1268    pub is_workspace: bool,
1269    pub created_at: chrono::DateTime<chrono::Utc>,
1270    pub updated_at: chrono::DateTime<chrono::Utc>,
1271}
1272
1273/// Full skill record — content + metadata. Marketplace fields removed; see
1274/// `SkillListItem` doc comment.
1275#[derive(Debug, Clone, Serialize, Deserialize, ToSchema, JsonSchema)]
1276pub struct SkillRecord {
1277    pub id: String,
1278    /// Workspace slug (cloud: resolved from workspace_id, OSS: "local")
1279    #[serde(default)]
1280    pub workspace_slug: String,
1281    pub name: String,
1282    /// Full qualified name: "{workspace_slug}/{name}"
1283    #[serde(default)]
1284    pub full_name: String,
1285    pub description: Option<String>,
1286    pub content: String,
1287    pub tags: Vec<String>,
1288    /// Whether the current user owns this skill
1289    #[serde(default)]
1290    pub is_owner: bool,
1291    /// True when the skill belongs to the current workspace
1292    #[serde(default)]
1293    pub is_workspace: bool,
1294    pub created_at: chrono::DateTime<chrono::Utc>,
1295    pub updated_at: chrono::DateTime<chrono::Utc>,
1296    /// Preferred model for skill execution (overrides agent default)
1297    #[serde(default, skip_serializing_if = "Option::is_none")]
1298    pub model: Option<String>,
1299    /// How to deliver skill content: inline (default) or fork (isolated sub-agent)
1300    #[serde(default)]
1301    pub context: ContextExecutionType,
1302}
1303
1304#[derive(Debug, Clone, Serialize, Deserialize, ToSchema, JsonSchema)]
1305#[schema(example = json!({"name": "my-skill", "content": "# My Skill\nA helpful utility skill", "description": "A utility skill", "tags": ["utility"]}))]
1306pub struct NewSkill {
1307    pub name: String,
1308    pub description: Option<String>,
1309    pub content: String,
1310    #[serde(default)]
1311    pub tags: Vec<String>,
1312    #[serde(default, skip_serializing_if = "Option::is_none")]
1313    pub model: Option<String>,
1314    #[serde(default)]
1315    pub context: ContextExecutionType,
1316}
1317
1318#[derive(Debug, Clone, Serialize, Deserialize, ToSchema, JsonSchema)]
1319pub struct UpdateSkill {
1320    pub name: Option<String>,
1321    pub description: Option<String>,
1322    pub content: Option<String>,
1323    pub tags: Option<Vec<String>>,
1324    #[serde(default, skip_serializing_if = "Option::is_none")]
1325    pub model: Option<String>,
1326    #[serde(default, skip_serializing_if = "Option::is_none")]
1327    pub context: Option<ContextExecutionType>,
1328}
1329
1330/// Which slice of skills to return.
1331#[derive(Debug, Clone, Default, Serialize, Deserialize, PartialEq)]
1332#[serde(rename_all = "snake_case")]
1333pub enum SkillScope {
1334    /// Skills belonging to the current workspace
1335    #[default]
1336    Workspace,
1337    /// Discover external skills (skillsmp.com / GitHub registries)
1338    Discover,
1339    /// Workspace + discover combined
1340    All,
1341}
1342
1343/// Filters for listing skills — one struct drives list, search, and pagination.
1344///
1345/// `Default` yields `page = 1, per_page = 50` so Rust-side callers like
1346/// `client.upsert_skill(...)` hit the correct first page. `#[serde(default)]`
1347/// uses the same values for missing JSON fields.
1348#[derive(Debug, Clone, Serialize, Deserialize)]
1349pub struct SkillFilter {
1350    /// Which slice of skills to return
1351    #[serde(default)]
1352    pub scope: SkillScope,
1353    /// Full-text search on name/description (empty = no search filter)
1354    #[serde(default)]
1355    pub search: Option<String>,
1356    /// Page number (1-based, default 1)
1357    #[serde(default = "default_page")]
1358    pub page: i64,
1359    /// Items per page (default 50)
1360    #[serde(default = "default_per_page")]
1361    pub per_page: i64,
1362}
1363
1364impl Default for SkillFilter {
1365    fn default() -> Self {
1366        Self {
1367            scope: SkillScope::default(),
1368            search: None,
1369            page: default_page(),
1370            per_page: default_per_page(),
1371        }
1372    }
1373}
1374
1375fn default_page() -> i64 {
1376    1
1377}
1378fn default_per_page() -> i64 {
1379    50
1380}
1381
1382/// Paginated skill list response.
1383#[derive(Debug, Clone, Serialize, Deserialize, ToSchema, JsonSchema)]
1384pub struct SkillListResponse {
1385    pub skills: Vec<SkillListItem>,
1386    pub total: i64,
1387    pub page: i64,
1388    pub per_page: i64,
1389    pub total_pages: i64,
1390}
1391
1392#[async_trait]
1393pub trait SkillStore: Send + Sync {
1394    /// List skills — scope, search, and pagination all via SkillFilter.
1395    async fn list(&self, filter: SkillFilter) -> anyhow::Result<SkillListResponse>;
1396    async fn get(&self, id: &str) -> anyhow::Result<Option<SkillRecord>>;
1397    async fn create(&self, skill: NewSkill) -> anyhow::Result<SkillRecord>;
1398    async fn update(&self, id: &str, update: UpdateSkill) -> anyhow::Result<SkillRecord>;
1399    async fn delete(&self, id: &str) -> anyhow::Result<()>;
1400
1401    /// Create-or-update a skill by name in the caller's current workspace.
1402    ///
1403    /// Mirrors `AgentStore::register` semantics: `distri skills push` is an
1404    /// UPSERT, not a CREATE. Implementations SHOULD do this atomically against
1405    /// the `(workspace_id, name)` unique constraint (Postgres: `ON CONFLICT
1406    /// DO UPDATE`). The default impl below is a fall-back for backends that
1407    /// don't have a native upsert — it performs the list+update-or-create
1408    /// dance the old client used to do, and inherits its races, so backends
1409    /// should override it whenever possible.
1410    async fn upsert_by_name(&self, skill: NewSkill) -> anyhow::Result<SkillRecord> {
1411        let response = self
1412            .list(SkillFilter {
1413                scope: SkillScope::Workspace,
1414                ..Default::default()
1415            })
1416            .await?;
1417        if let Some(existing) = response.skills.iter().find(|s| s.name == skill.name) {
1418            return self
1419                .update(
1420                    &existing.id,
1421                    UpdateSkill {
1422                        name: Some(skill.name),
1423                        description: skill.description,
1424                        content: Some(skill.content),
1425                        tags: Some(skill.tags),
1426                        model: skill.model,
1427                        context: Some(skill.context),
1428                    },
1429                )
1430                .await;
1431        }
1432        self.create(skill).await
1433    }
1434}
1435
1436// ─── Usage Service ──────────────────────────────────────────────────────────
1437
1438/// Current usage snapshot for a workspace/user.
1439#[derive(Debug, Clone, Default, Serialize, Deserialize, ToSchema, JsonSchema)]
1440pub struct UsageSnapshot {
1441    pub day_tokens: i64,
1442    pub week_tokens: i64,
1443    pub month_tokens: i64,
1444}
1445
1446/// Configured token limits for a workspace.
1447#[derive(Debug, Clone, Default, Serialize, Deserialize, ToSchema, JsonSchema)]
1448pub struct UsageLimits {
1449    pub daily_tokens: Option<i64>,
1450    pub weekly_tokens: Option<i64>,
1451    pub monthly_tokens: Option<i64>,
1452}
1453
1454/// Result of a rate limit check.
1455#[derive(Debug, Clone)]
1456pub enum UsageCheckResult {
1457    Allowed,
1458    Denied { reason: String },
1459}
1460
1461/// Trait for usage tracking, rate limiting, and workspace limit management.
1462///
1463/// OSS: can use a no-op or in-memory implementation.
1464/// Cloud: backed by Redis + Postgres with caching.
1465#[async_trait]
1466pub trait UsageService: Send + Sync {
1467    /// Check whether a request should be allowed based on all rate limits.
1468    /// Called by middleware before processing a request.
1469    /// `is_llm` indicates whether this is an LLM-consuming endpoint.
1470    /// `auth_source` is "jwt" or "api_key" for per-source analytics.
1471    async fn check_request(
1472        &self,
1473        workspace_id: &str,
1474        user_id: &str,
1475        is_llm: bool,
1476        auth_source: &str,
1477    ) -> UsageCheckResult;
1478
1479    /// Record token usage after a completed agent run.
1480    async fn record_usage(
1481        &self,
1482        workspace_id: &str,
1483        user_id: &str,
1484        tokens_used: i64,
1485    ) -> anyhow::Result<()>;
1486
1487    /// Get current usage snapshot for display.
1488    async fn get_usage(&self, workspace_id: &str, user_id: &str) -> anyhow::Result<UsageSnapshot>;
1489
1490    /// Get the configured limits for a workspace.
1491    async fn get_limits(&self, workspace_id: &str) -> anyhow::Result<UsageLimits>;
1492}
1493
1494/// No-op usage service for OSS / development.
1495/// Always allows requests, never records anything.
1496#[derive(Debug, Clone)]
1497pub struct NoOpUsageService;
1498
1499#[async_trait]
1500impl UsageService for NoOpUsageService {
1501    async fn check_request(
1502        &self,
1503        _workspace_id: &str,
1504        _user_id: &str,
1505        _is_llm: bool,
1506        _auth_source: &str,
1507    ) -> UsageCheckResult {
1508        UsageCheckResult::Allowed
1509    }
1510
1511    async fn record_usage(
1512        &self,
1513        _workspace_id: &str,
1514        _user_id: &str,
1515        _tokens_used: i64,
1516    ) -> anyhow::Result<()> {
1517        Ok(())
1518    }
1519
1520    async fn get_usage(
1521        &self,
1522        _workspace_id: &str,
1523        _user_id: &str,
1524    ) -> anyhow::Result<UsageSnapshot> {
1525        Ok(UsageSnapshot::default())
1526    }
1527
1528    async fn get_limits(&self, _workspace_id: &str) -> anyhow::Result<UsageLimits> {
1529        Ok(UsageLimits::default())
1530    }
1531}
1532
1533// ========== Connection Store ==========
1534
1535/// Persistence for connection records (Postgres-backed in cloud).
1536#[async_trait]
1537pub trait ConnectionStore: Send + Sync + 'static {
1538    async fn create(&self, connection: NewConnection) -> anyhow::Result<Connection>;
1539    async fn get_by_id(&self, id: &str) -> anyhow::Result<Option<Connection>>;
1540    async fn list_by_workspace(&self, workspace_id: &str) -> anyhow::Result<Vec<Connection>>;
1541    async fn update_status(&self, id: &str, status: ConnectionStatus) -> anyhow::Result<()>;
1542    async fn update_skill_id(&self, id: &str, skill_id: uuid::Uuid) -> anyhow::Result<()>;
1543    /// Rename a connection. Editing the embedded `auth` schema goes through
1544    /// `update_auth` instead.
1545    async fn update(&self, id: &str, name: Option<String>) -> anyhow::Result<Connection>;
1546    /// Replace the `config` JSONB. Used to round-trip workspace-scope
1547    /// `extra_auth_params` so the Edit dialog can pre-fill them.
1548    async fn update_config(&self, id: &str, config: serde_json::Value) -> anyhow::Result<()> {
1549        let _ = (id, config);
1550        Err(anyhow::anyhow!(
1551            "update_config not implemented for this ConnectionStore"
1552        ))
1553    }
1554    /// Replace the `auth` JSONB blob. Used by the
1555    /// `POST /v1/connections/{id}/resync-provider` admin endpoint to re-apply
1556    /// the catalog-provided `OAuthProviderConfig` onto an existing
1557    /// connection. `auth` must be a serialized `ConnectionAuth`.
1558    async fn update_auth(&self, id: &str, auth: serde_json::Value) -> anyhow::Result<()> {
1559        let _ = (id, auth);
1560        Err(anyhow::anyhow!(
1561            "update_auth not implemented for this ConnectionStore"
1562        ))
1563    }
1564    async fn delete(&self, id: &str) -> anyhow::Result<()>;
1565    /// Look up by `(workspace_id, provider)`. Resolution matches on
1566    /// `connections.auth->>'provider'` for OAuth.
1567    async fn get_by_provider(
1568        &self,
1569        workspace_id: &str,
1570        provider: &str,
1571    ) -> anyhow::Result<Option<Connection>>;
1572}
1573
1574/// Token storage for OAuth-auth connections (Redis-backed in cloud).
1575///
1576/// **Two key shapes coexist** for the two `AuthScope`s:
1577///
1578/// - **Workspace** scope → tokens stored under `connection_id` alone. One
1579///   slot per connection, shared by every workspace member. `store_token`,
1580///   `get_token`, `refresh_token` operate on this shape.
1581/// - **User** scope → sessions stored per `(connection_id, user_id)`. Each
1582///   end-user authorises themselves; their tokens never bleed into the
1583///   workspace slot. `get_user_session` / `refresh_user_session` operate
1584///   on this shape. The cloud implementation persists these via the
1585///   `ToolAuthStore` (Redis `oauth:session:{provider}:{ws}:{conn}:{user}`),
1586///   but the API exposes a single `AuthSession` so resolvers don't need
1587///   to know which underlying store.
1588#[async_trait]
1589pub trait ConnectionTokenStore: Send + Sync + 'static {
1590    async fn store_token(&self, connection_id: &str, token: ConnectionToken) -> anyhow::Result<()>;
1591    async fn get_token(&self, connection_id: &str) -> anyhow::Result<Option<ConnectionToken>>;
1592    async fn remove_token(&self, connection_id: &str) -> anyhow::Result<()>;
1593
1594    /// Attempt to refresh an expired **workspace-scope** OAuth token using
1595    /// the stored refresh_token. Returns the new token if refresh
1596    /// succeeds, or None if refresh is not supported or fails. The
1597    /// implementation should store the refreshed token.
1598    ///
1599    /// Cloud implementation uses OAuthHandler.refresh_get_session().
1600    /// Default: no refresh support (returns None).
1601    async fn refresh_token(
1602        &self,
1603        _connection_id: &str,
1604        _connection: &Connection,
1605    ) -> anyhow::Result<Option<ConnectionToken>> {
1606        Ok(None)
1607    }
1608
1609    /// Read the **user-scope** OAuth session for a specific end-user on a
1610    /// specific Connection. Returns `None` when the user hasn't completed
1611    /// the configure flow yet. Default impl returns None — only the cloud
1612    /// `RedisOAuthStore` actually has the underlying `ToolAuthStore` to
1613    /// dispatch to.
1614    async fn get_user_session(
1615        &self,
1616        _connection: &Connection,
1617        _user_id: &str,
1618    ) -> anyhow::Result<Option<crate::auth::AuthSession>> {
1619        Ok(None)
1620    }
1621
1622    /// Refresh an expired **user-scope** OAuth session in place. Returns
1623    /// the new session on success, `None` if refresh isn't supported / the
1624    /// refresh_token is missing / the provider rejected the refresh.
1625    /// Default: no refresh support.
1626    async fn refresh_user_session(
1627        &self,
1628        _connection: &Connection,
1629        _user_id: &str,
1630    ) -> anyhow::Result<Option<crate::auth::AuthSession>> {
1631        Ok(None)
1632    }
1633
1634    async fn store_oauth_state(
1635        &self,
1636        state_key: &str,
1637        state: serde_json::Value,
1638    ) -> anyhow::Result<()>;
1639    async fn get_oauth_state(&self, state_key: &str) -> anyhow::Result<Option<serde_json::Value>>;
1640    async fn remove_oauth_state(&self, state_key: &str) -> anyhow::Result<()>;
1641}
1642
1643// ========== Note Store ==========
1644
1645/// Persistence for workspace notes.
1646///
1647/// OSS: backed by SQLite via DieselNoteStore.
1648/// Cloud: backed by Postgres via the existing NoteStore in distri-cloud.
1649#[async_trait]
1650pub trait NoteStore: Send + Sync + 'static {
1651    /// List notes, optionally filtering by tag or full-text search.
1652    async fn list(
1653        &self,
1654        query: &crate::api::notes::ListNotesQuery,
1655    ) -> anyhow::Result<Vec<crate::api::notes::NoteRecord>>;
1656
1657    /// Fetch a single note by ID.
1658    async fn get(&self, id: Uuid) -> anyhow::Result<Option<crate::api::notes::NoteRecord>>;
1659
1660    /// Create a new note.
1661    async fn create(
1662        &self,
1663        req: crate::api::notes::CreateNoteRequest,
1664    ) -> anyhow::Result<crate::api::notes::NoteRecord>;
1665
1666    /// Update an existing note; returns the updated record or `None` if not found.
1667    async fn update(
1668        &self,
1669        id: Uuid,
1670        req: crate::api::notes::UpdateNoteRequest,
1671    ) -> anyhow::Result<Option<crate::api::notes::NoteRecord>>;
1672
1673    /// Delete a note. Returns `true` if the note existed and was deleted.
1674    async fn delete(&self, id: Uuid) -> anyhow::Result<bool>;
1675
1676    /// Full-text search on title and content.
1677    async fn search(&self, query: &str) -> anyhow::Result<Vec<crate::api::notes::NoteRecord>>;
1678}
1679
1680// ========== Span Store ==========
1681
1682/// Query selector for listing spans.
1683pub enum SpanQuery {
1684    ByThreadId(String),
1685    ByTraceId(String),
1686}
1687
1688/// Persistence for OTel span records.
1689///
1690/// Cloud implements this on top of Postgres; distri-server ships an
1691/// in-memory implementation that retains spans for the lifetime of the
1692/// process.
1693#[async_trait]
1694pub trait SpanStore: Send + Sync + 'static {
1695    /// Ingest a batch of spans (idempotent on trace_id + span_id).
1696    async fn bulk_insert(&self, spans: Vec<crate::api::spans::SpanRecord>)
1697    -> anyhow::Result<usize>;
1698
1699    /// Fetch all spans for a trace or thread, ordered by start_time_ns asc.
1700    async fn list_spans(
1701        &self,
1702        workspace_id: &str,
1703        query: SpanQuery,
1704    ) -> anyhow::Result<Vec<crate::api::spans::SpanRecord>>;
1705
1706    /// Aggregate view: one row per trace (root span + per-trace stats).
1707    async fn list_traces(
1708        &self,
1709        workspace_id: &str,
1710        limit: i64,
1711    ) -> anyhow::Result<Vec<crate::api::spans::TraceRecord>>;
1712}
1713
1714#[cfg(test)]
1715mod tests {
1716    use super::*;
1717
1718    #[test]
1719    fn test_skills_list_response_deserialize_cloud_format() {
1720        // Tolerates legacy marketplace fields (is_public, star_count, etc.)
1721        // in the wire format — they're ignored, not deserialized.
1722        let json = r#"{"skills":[{"id":"abc","workspace_slug":"ws","name":"test","full_name":"ws/test","description":"desc","tags":["t"],"is_public":true,"is_system":false,"is_owner":true,"star_count":0,"clone_count":0,"is_starred":false,"created_at":"2026-01-01T00:00:00Z","updated_at":"2026-01-01T00:00:00Z"}]}"#;
1723        let resp: SkillsListResponse = serde_json::from_str(json).unwrap();
1724        assert_eq!(resp.skills.len(), 1);
1725        assert_eq!(resp.skills[0].name, "test");
1726        assert_eq!(resp.skills[0].workspace_slug, "ws");
1727        assert_eq!(resp.skills[0].full_name, "ws/test");
1728        assert!(resp.skills[0].is_owner);
1729    }
1730
1731    #[test]
1732    fn test_skills_list_response_deserialize_defaults() {
1733        let json = r#"{"skills":[{"id":"abc","name":"test","created_at":"2026-01-01T00:00:00Z","updated_at":"2026-01-01T00:00:00Z"}]}"#;
1734        let resp: SkillsListResponse = serde_json::from_str(json).unwrap();
1735        assert_eq!(resp.skills[0].workspace_slug, "");
1736        assert_eq!(resp.skills[0].full_name, "");
1737        assert!(!resp.skills[0].is_owner);
1738        assert!(!resp.skills[0].is_workspace);
1739    }
1740
1741    #[test]
1742    fn test_skills_list_response_roundtrip() {
1743        let resp = SkillsListResponse {
1744            skills: vec![SkillListItem {
1745                id: "id1".into(),
1746                workspace_slug: "local".into(),
1747                name: "my_skill".into(),
1748                full_name: "local/my_skill".into(),
1749                description: Some("A skill".into()),
1750                tags: vec!["tag1".into()],
1751                is_owner: true,
1752                is_workspace: true,
1753                created_at: chrono::Utc::now(),
1754                updated_at: chrono::Utc::now(),
1755            }],
1756        };
1757        let json = serde_json::to_string(&resp).unwrap();
1758        let decoded: SkillsListResponse = serde_json::from_str(&json).unwrap();
1759        assert_eq!(decoded.skills[0].name, "my_skill");
1760        assert!(decoded.skills[0].is_workspace);
1761    }
1762}