1mod admin;
39mod agent_config;
40mod embedder;
41mod knowledge;
42mod memory;
43mod reranker;
44mod schema;
45
46use std::sync::Arc;
47
48use anyhow::{anyhow, Context, Result};
49use async_trait::async_trait;
50use chrono::Utc;
51use deadpool_postgres::{Config as PoolConfig, ManagerConfig, Pool, RecyclingMethod, Runtime};
52use tokio_postgres::NoTls;
53
54use smooth_operator::access_control::AccessContext;
55use smooth_operator::adapter::{
56 ConversationUpdate, MessagePage, MessageQuery, SessionUpdate, StorageAdapter,
57};
58use smooth_operator::domain::{
59 Conversation, Direction, Message, MessageContent, Participant, ParticipantRef, ParticipantType,
60 Platform, Session, SessionStatus,
61};
62use smooth_operator_core::checkpoint::PostgresCheckpointStore;
63use smooth_operator_core::{CheckpointStore, KnowledgeBase};
64
65pub use admin::{PgConnectorConfigStore, PgIndexingStore, PgSettingsStore};
70pub use agent_config::PgAgentConfigResolver;
71pub use embedder::{GatewayEmbedder, OPENAI_SMALL_EMBEDDING_DIM};
72pub use knowledge::PgKnowledgeBase;
73pub use memory::PgMemory;
74pub use reranker::{
75 GatewayReranker, HttpRerankBackend, RerankBackend, RerankScore, DEFAULT_RERANK_MODEL,
76};
77pub use smooth_operator::embedding::{
78 DeterministicEmbedder, Embedder, InputType, DEFAULT_EMBEDDING_DIM,
79};
80
81pub struct PostgresAdapter {
83 pool: Pool,
84 checkpoints: Option<Arc<PostgresCheckpointStore>>,
90 knowledge: Arc<PgKnowledgeBase>,
91 embedder: Arc<dyn Embedder>,
94 embedding_dim: usize,
95 handle: tokio::runtime::Handle,
98}
99
100impl Drop for PostgresAdapter {
101 fn drop(&mut self) {
102 if let Some(checkpoints) = self.checkpoints.take() {
103 if let Ok(handle) = std::thread::Builder::new()
109 .name("pg-checkpoint-drop".into())
110 .spawn(move || drop(checkpoints))
111 {
112 let _ = handle.join();
113 }
114 }
115 }
116}
117
118impl PostgresAdapter {
119 pub async fn connect(conn_str: &str) -> Result<Self> {
129 Self::connect_with_embedder(conn_str, Arc::new(DeterministicEmbedder::new())).await
130 }
131
132 pub async fn connect_with_embedder(
139 conn_str: &str,
140 embedder: Arc<dyn Embedder>,
141 ) -> Result<Self> {
142 let embedding_dim = embedder.dim();
143
144 let pg_config: tokio_postgres::Config = conn_str
146 .parse()
147 .context("parsing connection string for async pool")?;
148 let mut cfg = PoolConfig::new();
149 cfg.manager = Some(ManagerConfig {
150 recycling_method: RecyclingMethod::Fast,
151 });
152 cfg.dbname = pg_config.get_dbname().map(str::to_string);
154 cfg.user = pg_config.get_user().map(str::to_string);
155 cfg.password = pg_config
156 .get_password()
157 .map(|p| String::from_utf8_lossy(p).into_owned());
158 if let Some(host) = pg_config.get_hosts().iter().find_map(|h| match h {
159 tokio_postgres::config::Host::Tcp(t) => Some(t.clone()),
160 tokio_postgres::config::Host::Unix(p) => p.to_str().map(str::to_string),
161 }) {
162 cfg.host = Some(host);
163 }
164 if let Some(port) = pg_config.get_ports().first().copied() {
165 cfg.port = Some(port);
166 }
167 let pool = cfg
168 .create_pool(Some(Runtime::Tokio1), NoTls)
169 .context("building deadpool")?;
170
171 {
173 let client = pool
174 .get()
175 .await
176 .context("acquiring connection for migration")?;
177 client
178 .batch_execute(schema::OLTP_SCHEMA)
179 .await
180 .context("applying OLTP schema")?;
181 client
182 .batch_execute(schema::ADMIN_SCHEMA)
183 .await
184 .context("applying admin schema")?;
185 client
186 .batch_execute(schema::VECTOR_EXTENSION)
187 .await
188 .context("creating pgvector extension")?;
189 client
190 .batch_execute(&schema::knowledge_vectors_schema(embedding_dim))
191 .await
192 .context("applying knowledge_vectors schema")?;
193 client
194 .batch_execute(&schema::memories_schema(embedding_dim))
195 .await
196 .context("applying memories schema")?;
197 }
198
199 let cs_conn = conn_str.to_string();
203 let checkpoints =
204 tokio::task::spawn_blocking(move || PostgresCheckpointStore::connect(&cs_conn))
205 .await
206 .context("checkpoint store setup task panicked")?
207 .context("constructing PostgresCheckpointStore")?;
208 let checkpoints = Arc::new(checkpoints);
209
210 let handle = tokio::runtime::Handle::current();
212 let knowledge = Arc::new(PgKnowledgeBase::new(
213 pool.clone(),
214 embedder.clone(),
215 handle.clone(),
216 None,
217 ));
218
219 Ok(Self {
220 pool,
221 checkpoints: Some(checkpoints),
222 knowledge,
223 embedder,
224 embedding_dim,
225 handle,
226 })
227 }
228
229 pub async fn from_env() -> Result<Self> {
235 let conn_str = std::env::var("SMOOTH_AGENT_DATABASE_URL")
236 .or_else(|_| std::env::var("DATABASE_URL"))
237 .map_err(|_| anyhow!("neither SMOOTH_AGENT_DATABASE_URL nor DATABASE_URL is set"))?;
238 Self::connect(&conn_str).await
239 }
240
241 #[must_use]
243 pub fn embedding_dim(&self) -> usize {
244 self.embedding_dim
245 }
246
247 #[must_use]
251 pub fn connector_config_store(&self) -> PgConnectorConfigStore {
252 PgConnectorConfigStore::new(self.pool.clone(), self.handle.clone())
253 }
254
255 #[must_use]
258 pub fn settings_store(&self) -> PgSettingsStore {
259 PgSettingsStore::new(self.pool.clone(), self.handle.clone())
260 }
261
262 #[must_use]
268 pub fn agent_config_resolver(&self) -> PgAgentConfigResolver {
269 PgAgentConfigResolver::new(self.pool.clone())
270 }
271
272 #[must_use]
275 pub fn indexing_store(&self) -> PgIndexingStore {
276 PgIndexingStore::new(self.pool.clone(), self.handle.clone())
277 }
278
279 #[must_use]
289 pub fn memory(&self, organization_id: impl Into<String>, user_id: Option<String>) -> PgMemory {
290 PgMemory::new(
291 self.pool.clone(),
292 self.embedder.clone(),
293 self.handle.clone(),
294 organization_id,
295 user_id,
296 )
297 }
298}
299
300fn platform_to_str(p: Platform) -> &'static str {
303 match p {
304 Platform::Web => "web",
305 Platform::Messenger => "messenger",
306 Platform::Instagram => "instagram",
307 Platform::Email => "email",
308 Platform::Discord => "discord",
309 Platform::Phone => "phone",
310 Platform::Sms => "sms",
311 Platform::Slack => "slack",
312 Platform::Whatsapp => "whatsapp",
313 Platform::Tiktok => "tiktok",
314 }
315}
316
317fn platform_from_str(s: &str) -> Result<Platform> {
318 Ok(match s {
319 "web" => Platform::Web,
320 "messenger" => Platform::Messenger,
321 "instagram" => Platform::Instagram,
322 "email" => Platform::Email,
323 "discord" => Platform::Discord,
324 "phone" => Platform::Phone,
325 "sms" => Platform::Sms,
326 "slack" => Platform::Slack,
327 "whatsapp" => Platform::Whatsapp,
328 "tiktok" => Platform::Tiktok,
329 other => return Err(anyhow!("unknown platform '{other}'")),
330 })
331}
332
333fn participant_type_to_str(t: ParticipantType) -> &'static str {
334 match t {
335 ParticipantType::User => "user",
336 ParticipantType::AiAgent => "ai-agent",
337 ParticipantType::HumanAgent => "human-agent",
338 }
339}
340
341fn participant_type_from_str(s: &str) -> Result<ParticipantType> {
342 Ok(match s {
343 "user" => ParticipantType::User,
344 "ai-agent" => ParticipantType::AiAgent,
345 "human-agent" => ParticipantType::HumanAgent,
346 other => return Err(anyhow!("unknown participant type '{other}'")),
347 })
348}
349
350fn direction_to_str(d: Direction) -> &'static str {
351 match d {
352 Direction::Inbound => "inbound",
353 Direction::Outbound => "outbound",
354 }
355}
356
357fn direction_from_str(s: &str) -> Result<Direction> {
358 Ok(match s {
359 "inbound" => Direction::Inbound,
360 "outbound" => Direction::Outbound,
361 other => return Err(anyhow!("unknown direction '{other}'")),
362 })
363}
364
365fn session_status_to_str(s: SessionStatus) -> &'static str {
366 match s {
367 SessionStatus::Active => "active",
368 SessionStatus::Idle => "idle",
369 SessionStatus::Ended => "ended",
370 }
371}
372
373fn session_status_from_str(s: &str) -> Result<SessionStatus> {
374 Ok(match s {
375 "active" => SessionStatus::Active,
376 "idle" => SessionStatus::Idle,
377 "ended" => SessionStatus::Ended,
378 other => return Err(anyhow!("unknown session status '{other}'")),
379 })
380}
381
382fn row_to_conversation(row: &tokio_postgres::Row) -> Result<Conversation> {
383 Ok(Conversation {
384 id: row.get("id"),
385 platform: platform_from_str(row.get::<_, String>("platform").as_str())?,
386 name: row.get("name"),
387 organization_id: row.get("organization_id"),
388 idempotency_key: row.get("idempotency_key"),
389 metadata_json: row.get("metadata_json"),
390 analytics_json: row.get("analytics_json"),
391 created_at: row.get("created_at"),
392 updated_at: row.get("updated_at"),
393 })
394}
395
396fn row_to_participant(row: &tokio_postgres::Row) -> Result<Participant> {
397 Ok(Participant {
398 id: row.get("id"),
399 conversation_id: row.get("conversation_id"),
400 organization_id: row.get("organization_id"),
401 participant_type: participant_type_from_str(row.get::<_, String>("type").as_str())?,
402 external_id: row.get("external_id"),
403 internal_id: row.get("internal_id"),
404 browser_fingerprint: row.get("browser_fingerprint"),
405 browser_info: row.get("browser_info"),
406 name: row.get("name"),
407 email: row.get("email"),
408 phone: row.get("phone"),
409 crm_contact_id: row.get("crm_contact_id"),
410 metadata_json: row.get("metadata_json"),
411 created_at: row.get("created_at"),
412 updated_at: row.get("updated_at"),
413 })
414}
415
416fn row_to_message(row: &tokio_postgres::Row) -> Result<Message> {
417 let content: serde_json::Value = row.get("content");
418 let content: MessageContent =
419 serde_json::from_value(content).context("decoding message content")?;
420 let from: Option<serde_json::Value> = row.get("from_ref");
421 let to: Option<serde_json::Value> = row.get("to_ref");
422 let from: Option<ParticipantRef> = from
423 .map(serde_json::from_value)
424 .transpose()
425 .context("decoding from_ref")?;
426 let to: Option<ParticipantRef> = to
427 .map(serde_json::from_value)
428 .transpose()
429 .context("decoding to_ref")?;
430 Ok(Message {
431 id: row.get("id"),
432 external_id: row.get("external_id"),
433 organization_id: row.get("organization_id"),
434 conversation_id: row.get("conversation_id"),
435 direction: direction_from_str(row.get::<_, String>("direction").as_str())?,
436 content,
437 from,
438 to,
439 metadata_json: row.get("metadata_json"),
440 analytics_json: row.get("analytics_json"),
441 created_at: row.get("created_at"),
442 updated_at: row.get("updated_at"),
443 })
444}
445
446fn row_to_session(row: &tokio_postgres::Row) -> Result<Session> {
447 let status: Option<String> = row.get("status");
448 let status = status.map(|s| session_status_from_str(&s)).transpose()?;
449 let token_count: Option<i64> = row.get("token_count");
450 let message_count: Option<i64> = row.get("message_count");
451 let metadata: Option<serde_json::Value> = row.get("metadata");
452 let metadata = metadata
453 .map(serde_json::from_value)
454 .transpose()
455 .context("decoding session metadata")?;
456 Ok(Session {
457 session_id: row.get("session_id"),
458 conversation_id: row.get("conversation_id"),
459 organization_id: row.get("organization_id"),
460 agent_id: row.get("agent_id"),
461 agent_name: row.get("agent_name"),
462 user_participant_id: row.get("user_participant_id"),
463 agent_participant_id: row.get("agent_participant_id"),
464 thread_id: row.get("thread_id"),
465 status,
466 token_count: token_count.map(|v| u64::try_from(v).unwrap_or(0)),
467 message_count: message_count.map(|v| u64::try_from(v).unwrap_or(0)),
468 metadata,
469 created_at: row.get("created_at"),
470 updated_at: row.get("updated_at"),
471 ended_at: row.get("ended_at"),
472 last_activity_at: row.get("last_activity_at"),
473 })
474}
475
476#[async_trait]
477impl StorageAdapter for PostgresAdapter {
478 async fn create_conversation(&self, conversation: Conversation) -> Result<Conversation> {
481 let client = self.pool.get().await?;
482 client
485 .execute(
486 "INSERT INTO conversations
487 (id, platform, name, organization_id, idempotency_key, metadata_json, analytics_json, created_at, updated_at)
488 VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)
489 ON CONFLICT (organization_id, idempotency_key) DO NOTHING",
490 &[
491 &conversation.id,
492 &platform_to_str(conversation.platform),
493 &conversation.name,
494 &conversation.organization_id,
495 &conversation.idempotency_key,
496 &conversation.metadata_json,
497 &conversation.analytics_json,
498 &conversation.created_at,
499 &conversation.updated_at,
500 ],
501 )
502 .await?;
503 let row = client
504 .query_one(
505 "SELECT * FROM conversations WHERE organization_id = $1 AND idempotency_key = $2",
506 &[&conversation.organization_id, &conversation.idempotency_key],
507 )
508 .await?;
509 row_to_conversation(&row)
510 }
511
512 async fn get_conversation(&self, id: &str) -> Result<Option<Conversation>> {
513 let client = self.pool.get().await?;
514 let row = client
515 .query_opt("SELECT * FROM conversations WHERE id = $1", &[&id])
516 .await?;
517 row.as_ref().map(row_to_conversation).transpose()
518 }
519
520 async fn list_conversations_by_org(&self, organization_id: &str) -> Result<Vec<Conversation>> {
521 let client = self.pool.get().await?;
522 let rows = client
523 .query(
524 "SELECT * FROM conversations WHERE organization_id = $1 ORDER BY created_at DESC",
525 &[&organization_id],
526 )
527 .await?;
528 rows.iter().map(row_to_conversation).collect()
529 }
530
531 async fn update_conversation(
532 &self,
533 id: &str,
534 update: ConversationUpdate,
535 ) -> Result<Conversation> {
536 let client = self.pool.get().await?;
537 let now = Utc::now();
538 let set_metadata = update.metadata_json.is_some();
542 let set_analytics = update.analytics_json.is_some();
543 let row = client
544 .query_one(
545 "UPDATE conversations SET
546 name = COALESCE($2, name),
547 metadata_json = CASE WHEN $3 THEN $4 ELSE metadata_json END,
548 analytics_json = CASE WHEN $5 THEN $6 ELSE analytics_json END,
549 updated_at = $7
550 WHERE id = $1
551 RETURNING *",
552 &[
553 &id,
554 &update.name,
555 &set_metadata,
556 &update.metadata_json,
557 &set_analytics,
558 &update.analytics_json,
559 &now,
560 ],
561 )
562 .await
563 .with_context(|| format!("conversation '{id}' not found"))?;
564 row_to_conversation(&row)
565 }
566
567 async fn add_participant(&self, participant: Participant) -> Result<Participant> {
570 let client = self.pool.get().await?;
571 client
572 .execute(
573 "INSERT INTO conversation_participants
574 (id, conversation_id, organization_id, type, external_id, internal_id,
575 browser_fingerprint, browser_info, name, email, phone, crm_contact_id,
576 metadata_json, created_at, updated_at)
577 VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11,$12,$13,$14,$15)",
578 &[
579 &participant.id,
580 &participant.conversation_id,
581 &participant.organization_id,
582 &participant_type_to_str(participant.participant_type),
583 &participant.external_id,
584 &participant.internal_id,
585 &participant.browser_fingerprint,
586 &participant.browser_info,
587 &participant.name,
588 &participant.email,
589 &participant.phone,
590 &participant.crm_contact_id,
591 &participant.metadata_json,
592 &participant.created_at,
593 &participant.updated_at,
594 ],
595 )
596 .await?;
597 Ok(participant)
598 }
599
600 async fn get_participant(&self, id: &str) -> Result<Option<Participant>> {
601 let client = self.pool.get().await?;
602 let row = client
603 .query_opt(
604 "SELECT * FROM conversation_participants WHERE id = $1",
605 &[&id],
606 )
607 .await?;
608 row.as_ref().map(row_to_participant).transpose()
609 }
610
611 async fn list_participants_by_conversation(
612 &self,
613 conversation_id: &str,
614 ) -> Result<Vec<Participant>> {
615 let client = self.pool.get().await?;
616 let rows = client
617 .query(
618 "SELECT * FROM conversation_participants WHERE conversation_id = $1 ORDER BY created_at, id",
619 &[&conversation_id],
620 )
621 .await?;
622 rows.iter().map(row_to_participant).collect()
623 }
624
625 async fn resolve_participant_by_external_id(
626 &self,
627 conversation_id: &str,
628 external_id: &str,
629 ) -> Result<Option<Participant>> {
630 let client = self.pool.get().await?;
631 let row = client
632 .query_opt(
633 "SELECT * FROM conversation_participants
634 WHERE conversation_id = $1 AND external_id = $2
635 ORDER BY created_at LIMIT 1",
636 &[&conversation_id, &external_id],
637 )
638 .await?;
639 row.as_ref().map(row_to_participant).transpose()
640 }
641
642 async fn append_message(&self, message: Message) -> Result<Message> {
645 let client = self.pool.get().await?;
646 let content = serde_json::to_value(&message.content)?;
647 let from = message
648 .from
649 .as_ref()
650 .map(serde_json::to_value)
651 .transpose()?;
652 let to = message.to.as_ref().map(serde_json::to_value).transpose()?;
653 client
654 .execute(
655 "INSERT INTO conversation_messages
656 (id, external_id, organization_id, conversation_id, direction, content,
657 from_ref, to_ref, metadata_json, analytics_json, created_at, updated_at)
658 VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11,$12)",
659 &[
660 &message.id,
661 &message.external_id,
662 &message.organization_id,
663 &message.conversation_id,
664 &direction_to_str(message.direction),
665 &content,
666 &from,
667 &to,
668 &message.metadata_json,
669 &message.analytics_json,
670 &message.created_at,
671 &message.updated_at,
672 ],
673 )
674 .await?;
675 Ok(message)
676 }
677
678 async fn get_message(&self, id: &str) -> Result<Option<Message>> {
679 let client = self.pool.get().await?;
680 let row = client
681 .query_opt("SELECT * FROM conversation_messages WHERE id = $1", &[&id])
682 .await?;
683 row.as_ref().map(row_to_message).transpose()
684 }
685
686 async fn list_messages_by_conversation(&self, query: MessageQuery) -> Result<MessagePage> {
687 let client = self.pool.get().await?;
688 let limit_i64 = i64::try_from(query.limit).unwrap_or(i64::MAX);
689
690 let cursor_seq: Option<i64> = match &query.cursor {
693 Some(cursor) => {
694 let row = client
695 .query_opt(
696 "SELECT seq FROM conversation_messages WHERE id = $1",
697 &[&cursor],
698 )
699 .await?;
700 row.map(|r| r.get::<_, i64>("seq"))
701 }
702 None => None,
703 };
704
705 let probe = limit_i64.saturating_add(1);
708 let rows = if query.descending {
709 match cursor_seq {
711 Some(seq) => {
712 client
713 .query(
714 "SELECT * FROM conversation_messages
715 WHERE conversation_id = $1 AND seq < $2
716 ORDER BY seq DESC LIMIT $3",
717 &[&query.conversation_id, &seq, &probe],
718 )
719 .await?
720 }
721 None => {
722 client
723 .query(
724 "SELECT * FROM conversation_messages
725 WHERE conversation_id = $1
726 ORDER BY seq DESC LIMIT $2",
727 &[&query.conversation_id, &probe],
728 )
729 .await?
730 }
731 }
732 } else {
733 match cursor_seq {
735 Some(seq) => {
736 client
737 .query(
738 "SELECT * FROM conversation_messages
739 WHERE conversation_id = $1 AND seq > $2
740 ORDER BY seq ASC LIMIT $3",
741 &[&query.conversation_id, &seq, &probe],
742 )
743 .await?
744 }
745 None => {
746 client
747 .query(
748 "SELECT * FROM conversation_messages
749 WHERE conversation_id = $1
750 ORDER BY seq ASC LIMIT $2",
751 &[&query.conversation_id, &probe],
752 )
753 .await?
754 }
755 }
756 };
757
758 let has_more = rows.len() as i64 > limit_i64;
759 let page_rows = if has_more {
760 &rows[..query.limit]
761 } else {
762 &rows[..]
763 };
764 let messages: Vec<Message> = page_rows
765 .iter()
766 .map(row_to_message)
767 .collect::<Result<_>>()?;
768 let next_cursor = if has_more {
769 messages.last().map(|m| m.id.clone())
770 } else {
771 None
772 };
773
774 Ok(MessagePage {
775 messages,
776 next_cursor,
777 })
778 }
779
780 async fn create_session(&self, session: Session) -> Result<Session> {
783 let client = self.pool.get().await?;
784 let status = session.status.map(session_status_to_str);
785 let token_count = session
786 .token_count
787 .map(|v| i64::try_from(v).unwrap_or(i64::MAX));
788 let message_count = session
789 .message_count
790 .map(|v| i64::try_from(v).unwrap_or(i64::MAX));
791 let metadata = session
792 .metadata
793 .as_ref()
794 .map(serde_json::to_value)
795 .transpose()?;
796 client
797 .execute(
798 "INSERT INTO conversation_sessions
799 (session_id, conversation_id, organization_id, agent_id, agent_name,
800 user_participant_id, agent_participant_id, thread_id, status, token_count,
801 message_count, metadata, created_at, updated_at, ended_at, last_activity_at)
802 VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11,$12,$13,$14,$15,$16)",
803 &[
804 &session.session_id,
805 &session.conversation_id,
806 &session.organization_id,
807 &session.agent_id,
808 &session.agent_name,
809 &session.user_participant_id,
810 &session.agent_participant_id,
811 &session.thread_id,
812 &status,
813 &token_count,
814 &message_count,
815 &metadata,
816 &session.created_at,
817 &session.updated_at,
818 &session.ended_at,
819 &session.last_activity_at,
820 ],
821 )
822 .await?;
823 Ok(session)
824 }
825
826 async fn get_session(&self, session_id: &str) -> Result<Option<Session>> {
827 let client = self.pool.get().await?;
828 let row = client
829 .query_opt(
830 "SELECT * FROM conversation_sessions WHERE session_id = $1",
831 &[&session_id],
832 )
833 .await?;
834 row.as_ref().map(row_to_session).transpose()
835 }
836
837 async fn update_session(&self, session_id: &str, update: SessionUpdate) -> Result<Session> {
838 let client = self.pool.get().await?;
839 let now = Utc::now();
840 let status = update.status.map(session_status_to_str);
841 let token_count = update
842 .token_count
843 .map(|v| i64::try_from(v).unwrap_or(i64::MAX));
844 let message_count = update
845 .message_count
846 .map(|v| i64::try_from(v).unwrap_or(i64::MAX));
847 let set_last_activity = update.last_activity_at.is_some();
849 let set_ended = update.ended_at.is_some();
850 let row = client
851 .query_one(
852 "UPDATE conversation_sessions SET
853 status = COALESCE($2, status),
854 token_count = COALESCE($3, token_count),
855 message_count = COALESCE($4, message_count),
856 last_activity_at = CASE WHEN $5 THEN $6 ELSE last_activity_at END,
857 ended_at = CASE WHEN $7 THEN $8 ELSE ended_at END,
858 updated_at = $9
859 WHERE session_id = $1
860 RETURNING *",
861 &[
862 &session_id,
863 &status,
864 &token_count,
865 &message_count,
866 &set_last_activity,
867 &update.last_activity_at,
868 &set_ended,
869 &update.ended_at,
870 &now,
871 ],
872 )
873 .await
874 .with_context(|| format!("session '{session_id}' not found"))?;
875 row_to_session(&row)
876 }
877
878 async fn list_sessions_by_conversation(&self, conversation_id: &str) -> Result<Vec<Session>> {
879 let client = self.pool.get().await?;
880 let rows = client
881 .query(
882 "SELECT * FROM conversation_sessions WHERE conversation_id = $1 ORDER BY created_at, session_id",
883 &[&conversation_id],
884 )
885 .await?;
886 rows.iter().map(row_to_session).collect()
887 }
888
889 fn checkpoints(&self) -> Arc<dyn CheckpointStore> {
892 self.checkpoints
894 .as_ref()
895 .expect("checkpoint store present")
896 .clone()
897 }
898
899 fn knowledge(&self) -> Arc<dyn KnowledgeBase> {
900 self.knowledge.clone()
901 }
902
903 fn knowledge_for_access(&self, access: &AccessContext) -> Arc<dyn KnowledgeBase> {
904 Arc::new(self.knowledge.with_access(access.clone()))
910 }
911}