smooth_operator/adapter.rs
1//! The `StorageAdapter` seam.
2//!
3//! smooth-operator never names a database in application or agent code: everything
4//! goes through this one trait (see `docs/STORAGE.md`). Production backends
5//! (Postgres for k8s, DynamoDB for AWS serverless) implement it; the in-memory
6//! adapter in `adapters/in-memory` is the conformance baseline.
7//!
8//! The conversation / participant / message / session slices are async (their
9//! production backends are network calls). The checkpoint and knowledge slices
10//! are exposed as accessors returning smooth-operator's own
11//! [`CheckpointStore`](smooth_operator_core::CheckpointStore) and
12//! [`KnowledgeBase`](smooth_operator_core::KnowledgeBase) — both *synchronous* traits
13//! in smooth-operator-core — so the engine plugs straight in without an adapter shim.
14
15use std::sync::Arc;
16
17use anyhow::Result;
18use async_trait::async_trait;
19use serde::{Deserialize, Serialize};
20
21use smooth_operator_core::{CheckpointStore, KnowledgeBase};
22
23use crate::access_control::AccessContext;
24use crate::domain::{Conversation, Message, Participant, Session, SessionStatus};
25
26/// Partial update for a conversation. `None` fields are left unchanged.
27#[derive(Debug, Clone, Default, Serialize, Deserialize)]
28#[serde(rename_all = "camelCase")]
29pub struct ConversationUpdate {
30 pub name: Option<String>,
31 pub metadata_json: Option<serde_json::Value>,
32 pub analytics_json: Option<serde_json::Value>,
33}
34
35/// Partial update for a session (status / counters / activity timestamp).
36/// `None` fields are left unchanged.
37#[derive(Debug, Clone, Default, Serialize, Deserialize)]
38#[serde(rename_all = "camelCase")]
39pub struct SessionUpdate {
40 pub status: Option<SessionStatus>,
41 pub token_count: Option<u64>,
42 pub message_count: Option<u64>,
43 pub last_activity_at: Option<chrono::DateTime<chrono::Utc>>,
44 pub ended_at: Option<chrono::DateTime<chrono::Utc>>,
45}
46
47/// A page of messages, newest-or-oldest-first per the adapter's contract,
48/// with an opaque cursor for the next page (`None` when exhausted).
49#[derive(Debug, Clone, Serialize, Deserialize)]
50pub struct MessagePage {
51 pub messages: Vec<Message>,
52 /// Opaque cursor to pass back as `MessageQuery::cursor` for the next page.
53 pub next_cursor: Option<String>,
54}
55
56/// Paging / ordering parameters for `messages.list_by_conversation`.
57#[derive(Debug, Clone, Serialize, Deserialize)]
58pub struct MessageQuery {
59 pub conversation_id: String,
60 /// Max messages to return in this page.
61 pub limit: usize,
62 /// Opaque cursor from a prior `MessagePage::next_cursor`.
63 pub cursor: Option<String>,
64 /// When true, return newest messages first (the common "recent" read).
65 pub descending: bool,
66}
67
68impl MessageQuery {
69 /// A first-page query for `conversation_id`, oldest-first.
70 pub fn new(conversation_id: impl Into<String>, limit: usize) -> Self {
71 Self {
72 conversation_id: conversation_id.into(),
73 limit,
74 cursor: None,
75 descending: false,
76 }
77 }
78}
79
80/// The single storage seam. All slices are backend-agnostic.
81#[async_trait]
82pub trait StorageAdapter: Send + Sync {
83 // ---- conversations ---------------------------------------------------
84
85 /// Create (or idempotently return) a conversation.
86 async fn create_conversation(&self, conversation: Conversation) -> Result<Conversation>;
87
88 /// Fetch a conversation by id.
89 async fn get_conversation(&self, id: &str) -> Result<Option<Conversation>>;
90
91 /// List conversations owned by an organization (newest first).
92 async fn list_conversations_by_org(&self, organization_id: &str) -> Result<Vec<Conversation>>;
93
94 /// Apply a partial update to a conversation; returns the updated row.
95 async fn update_conversation(
96 &self,
97 id: &str,
98 update: ConversationUpdate,
99 ) -> Result<Conversation>;
100
101 // ---- participants ----------------------------------------------------
102
103 /// Add a participant to a conversation.
104 async fn add_participant(&self, participant: Participant) -> Result<Participant>;
105
106 /// Fetch a participant by id.
107 async fn get_participant(&self, id: &str) -> Result<Option<Participant>>;
108
109 /// List all participants in a conversation.
110 async fn list_participants_by_conversation(
111 &self,
112 conversation_id: &str,
113 ) -> Result<Vec<Participant>>;
114
115 /// Resolve a participant within a conversation by its external identity
116 /// (e.g. Supabase auth user id). Used to re-attach a returning user.
117 async fn resolve_participant_by_external_id(
118 &self,
119 conversation_id: &str,
120 external_id: &str,
121 ) -> Result<Option<Participant>>;
122
123 // ---- messages --------------------------------------------------------
124
125 /// Append a message to a conversation.
126 async fn append_message(&self, message: Message) -> Result<Message>;
127
128 /// Fetch a message by id.
129 async fn get_message(&self, id: &str) -> Result<Option<Message>>;
130
131 /// List messages in a conversation, paged.
132 async fn list_messages_by_conversation(&self, query: MessageQuery) -> Result<MessagePage>;
133
134 // ---- sessions --------------------------------------------------------
135
136 /// Create a session (binds a conversation to a smooth-operator thread).
137 async fn create_session(&self, session: Session) -> Result<Session>;
138
139 /// Fetch a session by id.
140 async fn get_session(&self, session_id: &str) -> Result<Option<Session>>;
141
142 /// Apply a partial update (status / counts / activity) to a session.
143 async fn update_session(&self, session_id: &str, update: SessionUpdate) -> Result<Session>;
144
145 /// List sessions attached to a conversation.
146 async fn list_sessions_by_conversation(&self, conversation_id: &str) -> Result<Vec<Session>>;
147
148 // ---- engine accessors ------------------------------------------------
149
150 /// The checkpoint store, ready to hand to a smooth-operator `Agent`
151 /// via `Agent::with_checkpoint_store`. Synchronous trait — the engine
152 /// calls it directly.
153 fn checkpoints(&self) -> Arc<dyn CheckpointStore>;
154
155 /// The knowledge base, ready to hand to a smooth-operator `AgentConfig`
156 /// via `AgentConfig::with_knowledge`. Synchronous trait.
157 ///
158 /// This handle performs **org isolation only** — it does not enforce
159 /// within-org document-level ACLs. The chat retrieval path MUST use
160 /// [`knowledge_for_access`](Self::knowledge_for_access) instead so a
161 /// restricted document (e.g. a private GitHub repo scoped to a group) is
162 /// never returned to a requester who lacks the entitlement.
163 fn knowledge(&self) -> Arc<dyn KnowledgeBase>;
164
165 /// An **ACL-enforcing** knowledge handle bound to the requester's
166 /// [`AccessContext`]: its `query` returns only documents the requester is
167 /// entitled to read (org-public docs, docs the requester's user id is on, or
168 /// docs any of the requester's groups is on). This is the handle the chat
169 /// retrieval path (auto-injected context **and** the `knowledge_search`
170 /// tool) MUST read through — see `docs/ACCESS-CONTROL.md`.
171 ///
172 /// ## Default — **fail closed for ACL'd content**
173 ///
174 /// The default implementation wraps [`knowledge`](Self::knowledge) in an
175 /// [`AclKnowledgeStore`](crate::access_control::AclKnowledgeStore) reader.
176 /// Because that wrapper's ACL side table starts empty (the documents were
177 /// ingested through a different store instance), every document it sees is
178 /// treated as org-public — which is the *raw* `knowledge()` behavior and is
179 /// therefore **not** a regression, but also offers no within-org protection.
180 /// Backends that can persist + read back a document's ACL (the in-memory
181 /// adapter via a shared store; Postgres / DynamoDB via a stored ACL column)
182 /// **override** this method to enforce the ACL durably, so restricted docs
183 /// are dropped for unentitled requesters even across the ingest→serve
184 /// process boundary.
185 fn knowledge_for_access(&self, access: &AccessContext) -> Arc<dyn KnowledgeBase> {
186 crate::access_control::AclKnowledgeStore::new(self.knowledge()).reader(access.clone())
187 }
188}