Skip to main content

smooth_operator/
adapter.rs

1//! The `StorageAdapter` seam.
2//!
3//! smooth-operator never names a database in application or agent code: everything
4//! goes through this one trait (see `docs/STORAGE.md`). Production backends
5//! (Postgres for k8s, DynamoDB for AWS serverless) implement it; the in-memory
6//! adapter in `adapters/in-memory` is the conformance baseline.
7//!
8//! The conversation / participant / message / session slices are async (their
9//! production backends are network calls). The checkpoint and knowledge slices
10//! are exposed as accessors returning smooth-operator's own
11//! [`CheckpointStore`](smooth_operator_core::CheckpointStore) and
12//! [`KnowledgeBase`](smooth_operator_core::KnowledgeBase) — both *synchronous* traits
13//! in smooth-operator-core — so the engine plugs straight in without an adapter shim.
14
15use std::sync::Arc;
16
17use anyhow::Result;
18use async_trait::async_trait;
19use serde::{Deserialize, Serialize};
20
21use smooth_operator_core::{CheckpointStore, KnowledgeBase};
22
23use crate::access_control::AccessContext;
24use crate::domain::{Conversation, Message, Participant, Session, SessionStatus};
25
26/// Partial update for a conversation. `None` fields are left unchanged.
27#[derive(Debug, Clone, Default, Serialize, Deserialize)]
28#[serde(rename_all = "camelCase")]
29pub struct ConversationUpdate {
30    pub name: Option<String>,
31    pub metadata_json: Option<serde_json::Value>,
32    pub analytics_json: Option<serde_json::Value>,
33}
34
35/// Partial update for a session (status / counters / activity timestamp).
36/// `None` fields are left unchanged.
37#[derive(Debug, Clone, Default, Serialize, Deserialize)]
38#[serde(rename_all = "camelCase")]
39pub struct SessionUpdate {
40    pub status: Option<SessionStatus>,
41    pub token_count: Option<u64>,
42    pub message_count: Option<u64>,
43    pub last_activity_at: Option<chrono::DateTime<chrono::Utc>>,
44    pub ended_at: Option<chrono::DateTime<chrono::Utc>>,
45}
46
47/// A page of messages, newest-or-oldest-first per the adapter's contract,
48/// with an opaque cursor for the next page (`None` when exhausted).
49#[derive(Debug, Clone, Serialize, Deserialize)]
50pub struct MessagePage {
51    pub messages: Vec<Message>,
52    /// Opaque cursor to pass back as `MessageQuery::cursor` for the next page.
53    pub next_cursor: Option<String>,
54}
55
56/// Paging / ordering parameters for `messages.list_by_conversation`.
57#[derive(Debug, Clone, Serialize, Deserialize)]
58pub struct MessageQuery {
59    pub conversation_id: String,
60    /// Max messages to return in this page.
61    pub limit: usize,
62    /// Opaque cursor from a prior `MessagePage::next_cursor`.
63    pub cursor: Option<String>,
64    /// When true, return newest messages first (the common "recent" read).
65    pub descending: bool,
66}
67
68impl MessageQuery {
69    /// A first-page query for `conversation_id`, oldest-first.
70    pub fn new(conversation_id: impl Into<String>, limit: usize) -> Self {
71        Self {
72            conversation_id: conversation_id.into(),
73            limit,
74            cursor: None,
75            descending: false,
76        }
77    }
78}
79
80/// The single storage seam. All slices are backend-agnostic.
81#[async_trait]
82pub trait StorageAdapter: Send + Sync {
83    // ---- conversations ---------------------------------------------------
84
85    /// Create (or idempotently return) a conversation.
86    async fn create_conversation(&self, conversation: Conversation) -> Result<Conversation>;
87
88    /// Fetch a conversation by id.
89    async fn get_conversation(&self, id: &str) -> Result<Option<Conversation>>;
90
91    /// List conversations owned by an organization (newest first).
92    async fn list_conversations_by_org(&self, organization_id: &str) -> Result<Vec<Conversation>>;
93
94    /// Apply a partial update to a conversation; returns the updated row.
95    async fn update_conversation(
96        &self,
97        id: &str,
98        update: ConversationUpdate,
99    ) -> Result<Conversation>;
100
101    // ---- participants ----------------------------------------------------
102
103    /// Add a participant to a conversation.
104    async fn add_participant(&self, participant: Participant) -> Result<Participant>;
105
106    /// Fetch a participant by id.
107    async fn get_participant(&self, id: &str) -> Result<Option<Participant>>;
108
109    /// List all participants in a conversation.
110    async fn list_participants_by_conversation(
111        &self,
112        conversation_id: &str,
113    ) -> Result<Vec<Participant>>;
114
115    /// Resolve a participant within a conversation by its external identity
116    /// (e.g. Supabase auth user id). Used to re-attach a returning user.
117    async fn resolve_participant_by_external_id(
118        &self,
119        conversation_id: &str,
120        external_id: &str,
121    ) -> Result<Option<Participant>>;
122
123    // ---- messages --------------------------------------------------------
124
125    /// Append a message to a conversation.
126    async fn append_message(&self, message: Message) -> Result<Message>;
127
128    /// Fetch a message by id.
129    async fn get_message(&self, id: &str) -> Result<Option<Message>>;
130
131    /// List messages in a conversation, paged.
132    async fn list_messages_by_conversation(&self, query: MessageQuery) -> Result<MessagePage>;
133
134    // ---- sessions --------------------------------------------------------
135
136    /// Create a session (binds a conversation to a smooth-operator thread).
137    async fn create_session(&self, session: Session) -> Result<Session>;
138
139    /// Fetch a session by id.
140    async fn get_session(&self, session_id: &str) -> Result<Option<Session>>;
141
142    /// Apply a partial update (status / counts / activity) to a session.
143    async fn update_session(&self, session_id: &str, update: SessionUpdate) -> Result<Session>;
144
145    /// List sessions attached to a conversation.
146    async fn list_sessions_by_conversation(&self, conversation_id: &str) -> Result<Vec<Session>>;
147
148    // ---- engine accessors ------------------------------------------------
149
150    /// The checkpoint store, ready to hand to a smooth-operator `Agent`
151    /// via `Agent::with_checkpoint_store`. Synchronous trait — the engine
152    /// calls it directly.
153    fn checkpoints(&self) -> Arc<dyn CheckpointStore>;
154
155    /// The knowledge base, ready to hand to a smooth-operator `AgentConfig`
156    /// via `AgentConfig::with_knowledge`. Synchronous trait.
157    ///
158    /// This handle performs **org isolation only** — it does not enforce
159    /// within-org document-level ACLs. The chat retrieval path MUST use
160    /// [`knowledge_for_access`](Self::knowledge_for_access) instead so a
161    /// restricted document (e.g. a private GitHub repo scoped to a group) is
162    /// never returned to a requester who lacks the entitlement.
163    fn knowledge(&self) -> Arc<dyn KnowledgeBase>;
164
165    /// An **ACL-enforcing** knowledge handle bound to the requester's
166    /// [`AccessContext`]: its `query` returns only documents the requester is
167    /// entitled to read (org-public docs, docs the requester's user id is on, or
168    /// docs any of the requester's groups is on). This is the handle the chat
169    /// retrieval path (auto-injected context **and** the `knowledge_search`
170    /// tool) MUST read through — see `docs/ACCESS-CONTROL.md`.
171    ///
172    /// ## Default — **fail closed for ACL'd content**
173    ///
174    /// The default implementation wraps [`knowledge`](Self::knowledge) in an
175    /// [`AclKnowledgeStore`](crate::access_control::AclKnowledgeStore) reader.
176    /// Because that wrapper's ACL side table starts empty (the documents were
177    /// ingested through a different store instance), every document it sees is
178    /// treated as org-public — which is the *raw* `knowledge()` behavior and is
179    /// therefore **not** a regression, but also offers no within-org protection.
180    /// Backends that can persist + read back a document's ACL (the in-memory
181    /// adapter via a shared store; Postgres / DynamoDB via a stored ACL column)
182    /// **override** this method to enforce the ACL durably, so restricted docs
183    /// are dropped for unentitled requesters even across the ingest→serve
184    /// process boundary.
185    fn knowledge_for_access(&self, access: &AccessContext) -> Arc<dyn KnowledgeBase> {
186        crate::access_control::AclKnowledgeStore::new(self.knowledge()).reader(access.clone())
187    }
188}