1mod admin;
39mod embedder;
40mod knowledge;
41mod memory;
42mod reranker;
43mod schema;
44
45use std::sync::Arc;
46
47use anyhow::{anyhow, Context, Result};
48use async_trait::async_trait;
49use chrono::Utc;
50use deadpool_postgres::{Config as PoolConfig, ManagerConfig, Pool, RecyclingMethod, Runtime};
51use tokio_postgres::NoTls;
52
53use smooth_operator::access_control::AccessContext;
54use smooth_operator::adapter::{
55 ConversationUpdate, MessagePage, MessageQuery, SessionUpdate, StorageAdapter,
56};
57use smooth_operator::domain::{
58 Conversation, Direction, Message, MessageContent, Participant, ParticipantRef, ParticipantType,
59 Platform, Session, SessionStatus,
60};
61use smooth_operator_core::checkpoint::PostgresCheckpointStore;
62use smooth_operator_core::{CheckpointStore, KnowledgeBase};
63
64pub use admin::{PgConnectorConfigStore, PgIndexingStore, PgSettingsStore};
69pub use embedder::{GatewayEmbedder, OPENAI_SMALL_EMBEDDING_DIM};
70pub use knowledge::PgKnowledgeBase;
71pub use memory::PgMemory;
72pub use reranker::{
73 GatewayReranker, HttpRerankBackend, RerankBackend, RerankScore, DEFAULT_RERANK_MODEL,
74};
75pub use smooth_operator::embedding::{
76 DeterministicEmbedder, Embedder, InputType, DEFAULT_EMBEDDING_DIM,
77};
78
79pub struct PostgresAdapter {
81 pool: Pool,
82 checkpoints: Option<Arc<PostgresCheckpointStore>>,
88 knowledge: Arc<PgKnowledgeBase>,
89 embedder: Arc<dyn Embedder>,
92 embedding_dim: usize,
93 handle: tokio::runtime::Handle,
96}
97
98impl Drop for PostgresAdapter {
99 fn drop(&mut self) {
100 if let Some(checkpoints) = self.checkpoints.take() {
101 if let Ok(handle) = std::thread::Builder::new()
107 .name("pg-checkpoint-drop".into())
108 .spawn(move || drop(checkpoints))
109 {
110 let _ = handle.join();
111 }
112 }
113 }
114}
115
116impl PostgresAdapter {
117 pub async fn connect(conn_str: &str) -> Result<Self> {
127 Self::connect_with_embedder(conn_str, Arc::new(DeterministicEmbedder::new())).await
128 }
129
130 pub async fn connect_with_embedder(
137 conn_str: &str,
138 embedder: Arc<dyn Embedder>,
139 ) -> Result<Self> {
140 let embedding_dim = embedder.dim();
141
142 let pg_config: tokio_postgres::Config = conn_str
144 .parse()
145 .context("parsing connection string for async pool")?;
146 let mut cfg = PoolConfig::new();
147 cfg.manager = Some(ManagerConfig {
148 recycling_method: RecyclingMethod::Fast,
149 });
150 cfg.dbname = pg_config.get_dbname().map(str::to_string);
152 cfg.user = pg_config.get_user().map(str::to_string);
153 cfg.password = pg_config
154 .get_password()
155 .map(|p| String::from_utf8_lossy(p).into_owned());
156 if let Some(host) = pg_config.get_hosts().iter().find_map(|h| match h {
157 tokio_postgres::config::Host::Tcp(t) => Some(t.clone()),
158 tokio_postgres::config::Host::Unix(p) => p.to_str().map(str::to_string),
159 }) {
160 cfg.host = Some(host);
161 }
162 if let Some(port) = pg_config.get_ports().first().copied() {
163 cfg.port = Some(port);
164 }
165 let pool = cfg
166 .create_pool(Some(Runtime::Tokio1), NoTls)
167 .context("building deadpool")?;
168
169 {
171 let client = pool
172 .get()
173 .await
174 .context("acquiring connection for migration")?;
175 client
176 .batch_execute(schema::OLTP_SCHEMA)
177 .await
178 .context("applying OLTP schema")?;
179 client
180 .batch_execute(schema::ADMIN_SCHEMA)
181 .await
182 .context("applying admin schema")?;
183 client
184 .batch_execute(schema::VECTOR_EXTENSION)
185 .await
186 .context("creating pgvector extension")?;
187 client
188 .batch_execute(&schema::knowledge_vectors_schema(embedding_dim))
189 .await
190 .context("applying knowledge_vectors schema")?;
191 client
192 .batch_execute(&schema::memories_schema(embedding_dim))
193 .await
194 .context("applying memories schema")?;
195 }
196
197 let cs_conn = conn_str.to_string();
201 let checkpoints =
202 tokio::task::spawn_blocking(move || PostgresCheckpointStore::connect(&cs_conn))
203 .await
204 .context("checkpoint store setup task panicked")?
205 .context("constructing PostgresCheckpointStore")?;
206 let checkpoints = Arc::new(checkpoints);
207
208 let handle = tokio::runtime::Handle::current();
210 let knowledge = Arc::new(PgKnowledgeBase::new(
211 pool.clone(),
212 embedder.clone(),
213 handle.clone(),
214 None,
215 ));
216
217 Ok(Self {
218 pool,
219 checkpoints: Some(checkpoints),
220 knowledge,
221 embedder,
222 embedding_dim,
223 handle,
224 })
225 }
226
227 pub async fn from_env() -> Result<Self> {
233 let conn_str = std::env::var("SMOOTH_AGENT_DATABASE_URL")
234 .or_else(|_| std::env::var("DATABASE_URL"))
235 .map_err(|_| anyhow!("neither SMOOTH_AGENT_DATABASE_URL nor DATABASE_URL is set"))?;
236 Self::connect(&conn_str).await
237 }
238
239 #[must_use]
241 pub fn embedding_dim(&self) -> usize {
242 self.embedding_dim
243 }
244
245 #[must_use]
249 pub fn connector_config_store(&self) -> PgConnectorConfigStore {
250 PgConnectorConfigStore::new(self.pool.clone(), self.handle.clone())
251 }
252
253 #[must_use]
256 pub fn settings_store(&self) -> PgSettingsStore {
257 PgSettingsStore::new(self.pool.clone(), self.handle.clone())
258 }
259
260 #[must_use]
263 pub fn indexing_store(&self) -> PgIndexingStore {
264 PgIndexingStore::new(self.pool.clone(), self.handle.clone())
265 }
266
267 #[must_use]
277 pub fn memory(&self, organization_id: impl Into<String>, user_id: Option<String>) -> PgMemory {
278 PgMemory::new(
279 self.pool.clone(),
280 self.embedder.clone(),
281 self.handle.clone(),
282 organization_id,
283 user_id,
284 )
285 }
286}
287
288fn platform_to_str(p: Platform) -> &'static str {
291 match p {
292 Platform::Web => "web",
293 Platform::Messenger => "messenger",
294 Platform::Instagram => "instagram",
295 Platform::Email => "email",
296 Platform::Discord => "discord",
297 Platform::Phone => "phone",
298 Platform::Sms => "sms",
299 Platform::Slack => "slack",
300 Platform::Whatsapp => "whatsapp",
301 Platform::Tiktok => "tiktok",
302 }
303}
304
305fn platform_from_str(s: &str) -> Result<Platform> {
306 Ok(match s {
307 "web" => Platform::Web,
308 "messenger" => Platform::Messenger,
309 "instagram" => Platform::Instagram,
310 "email" => Platform::Email,
311 "discord" => Platform::Discord,
312 "phone" => Platform::Phone,
313 "sms" => Platform::Sms,
314 "slack" => Platform::Slack,
315 "whatsapp" => Platform::Whatsapp,
316 "tiktok" => Platform::Tiktok,
317 other => return Err(anyhow!("unknown platform '{other}'")),
318 })
319}
320
321fn participant_type_to_str(t: ParticipantType) -> &'static str {
322 match t {
323 ParticipantType::User => "user",
324 ParticipantType::AiAgent => "ai-agent",
325 ParticipantType::HumanAgent => "human-agent",
326 }
327}
328
329fn participant_type_from_str(s: &str) -> Result<ParticipantType> {
330 Ok(match s {
331 "user" => ParticipantType::User,
332 "ai-agent" => ParticipantType::AiAgent,
333 "human-agent" => ParticipantType::HumanAgent,
334 other => return Err(anyhow!("unknown participant type '{other}'")),
335 })
336}
337
338fn direction_to_str(d: Direction) -> &'static str {
339 match d {
340 Direction::Inbound => "inbound",
341 Direction::Outbound => "outbound",
342 }
343}
344
345fn direction_from_str(s: &str) -> Result<Direction> {
346 Ok(match s {
347 "inbound" => Direction::Inbound,
348 "outbound" => Direction::Outbound,
349 other => return Err(anyhow!("unknown direction '{other}'")),
350 })
351}
352
353fn session_status_to_str(s: SessionStatus) -> &'static str {
354 match s {
355 SessionStatus::Active => "active",
356 SessionStatus::Idle => "idle",
357 SessionStatus::Ended => "ended",
358 }
359}
360
361fn session_status_from_str(s: &str) -> Result<SessionStatus> {
362 Ok(match s {
363 "active" => SessionStatus::Active,
364 "idle" => SessionStatus::Idle,
365 "ended" => SessionStatus::Ended,
366 other => return Err(anyhow!("unknown session status '{other}'")),
367 })
368}
369
370fn row_to_conversation(row: &tokio_postgres::Row) -> Result<Conversation> {
371 Ok(Conversation {
372 id: row.get("id"),
373 platform: platform_from_str(row.get::<_, String>("platform").as_str())?,
374 name: row.get("name"),
375 organization_id: row.get("organization_id"),
376 idempotency_key: row.get("idempotency_key"),
377 metadata_json: row.get("metadata_json"),
378 analytics_json: row.get("analytics_json"),
379 created_at: row.get("created_at"),
380 updated_at: row.get("updated_at"),
381 })
382}
383
384fn row_to_participant(row: &tokio_postgres::Row) -> Result<Participant> {
385 Ok(Participant {
386 id: row.get("id"),
387 conversation_id: row.get("conversation_id"),
388 organization_id: row.get("organization_id"),
389 participant_type: participant_type_from_str(row.get::<_, String>("type").as_str())?,
390 external_id: row.get("external_id"),
391 internal_id: row.get("internal_id"),
392 browser_fingerprint: row.get("browser_fingerprint"),
393 browser_info: row.get("browser_info"),
394 name: row.get("name"),
395 email: row.get("email"),
396 phone: row.get("phone"),
397 crm_contact_id: row.get("crm_contact_id"),
398 metadata_json: row.get("metadata_json"),
399 created_at: row.get("created_at"),
400 updated_at: row.get("updated_at"),
401 })
402}
403
404fn row_to_message(row: &tokio_postgres::Row) -> Result<Message> {
405 let content: serde_json::Value = row.get("content");
406 let content: MessageContent =
407 serde_json::from_value(content).context("decoding message content")?;
408 let from: Option<serde_json::Value> = row.get("from_ref");
409 let to: Option<serde_json::Value> = row.get("to_ref");
410 let from: Option<ParticipantRef> = from
411 .map(serde_json::from_value)
412 .transpose()
413 .context("decoding from_ref")?;
414 let to: Option<ParticipantRef> = to
415 .map(serde_json::from_value)
416 .transpose()
417 .context("decoding to_ref")?;
418 Ok(Message {
419 id: row.get("id"),
420 external_id: row.get("external_id"),
421 organization_id: row.get("organization_id"),
422 conversation_id: row.get("conversation_id"),
423 direction: direction_from_str(row.get::<_, String>("direction").as_str())?,
424 content,
425 from,
426 to,
427 metadata_json: row.get("metadata_json"),
428 analytics_json: row.get("analytics_json"),
429 created_at: row.get("created_at"),
430 updated_at: row.get("updated_at"),
431 })
432}
433
434fn row_to_session(row: &tokio_postgres::Row) -> Result<Session> {
435 let status: Option<String> = row.get("status");
436 let status = status.map(|s| session_status_from_str(&s)).transpose()?;
437 let token_count: Option<i64> = row.get("token_count");
438 let message_count: Option<i64> = row.get("message_count");
439 let metadata: Option<serde_json::Value> = row.get("metadata");
440 let metadata = metadata
441 .map(serde_json::from_value)
442 .transpose()
443 .context("decoding session metadata")?;
444 Ok(Session {
445 session_id: row.get("session_id"),
446 conversation_id: row.get("conversation_id"),
447 organization_id: row.get("organization_id"),
448 agent_id: row.get("agent_id"),
449 agent_name: row.get("agent_name"),
450 user_participant_id: row.get("user_participant_id"),
451 agent_participant_id: row.get("agent_participant_id"),
452 thread_id: row.get("thread_id"),
453 status,
454 token_count: token_count.map(|v| u64::try_from(v).unwrap_or(0)),
455 message_count: message_count.map(|v| u64::try_from(v).unwrap_or(0)),
456 metadata,
457 created_at: row.get("created_at"),
458 updated_at: row.get("updated_at"),
459 ended_at: row.get("ended_at"),
460 last_activity_at: row.get("last_activity_at"),
461 })
462}
463
464#[async_trait]
465impl StorageAdapter for PostgresAdapter {
466 async fn create_conversation(&self, conversation: Conversation) -> Result<Conversation> {
469 let client = self.pool.get().await?;
470 client
473 .execute(
474 "INSERT INTO conversations
475 (id, platform, name, organization_id, idempotency_key, metadata_json, analytics_json, created_at, updated_at)
476 VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)
477 ON CONFLICT (organization_id, idempotency_key) DO NOTHING",
478 &[
479 &conversation.id,
480 &platform_to_str(conversation.platform),
481 &conversation.name,
482 &conversation.organization_id,
483 &conversation.idempotency_key,
484 &conversation.metadata_json,
485 &conversation.analytics_json,
486 &conversation.created_at,
487 &conversation.updated_at,
488 ],
489 )
490 .await?;
491 let row = client
492 .query_one(
493 "SELECT * FROM conversations WHERE organization_id = $1 AND idempotency_key = $2",
494 &[&conversation.organization_id, &conversation.idempotency_key],
495 )
496 .await?;
497 row_to_conversation(&row)
498 }
499
500 async fn get_conversation(&self, id: &str) -> Result<Option<Conversation>> {
501 let client = self.pool.get().await?;
502 let row = client
503 .query_opt("SELECT * FROM conversations WHERE id = $1", &[&id])
504 .await?;
505 row.as_ref().map(row_to_conversation).transpose()
506 }
507
508 async fn list_conversations_by_org(&self, organization_id: &str) -> Result<Vec<Conversation>> {
509 let client = self.pool.get().await?;
510 let rows = client
511 .query(
512 "SELECT * FROM conversations WHERE organization_id = $1 ORDER BY created_at DESC",
513 &[&organization_id],
514 )
515 .await?;
516 rows.iter().map(row_to_conversation).collect()
517 }
518
519 async fn update_conversation(
520 &self,
521 id: &str,
522 update: ConversationUpdate,
523 ) -> Result<Conversation> {
524 let client = self.pool.get().await?;
525 let now = Utc::now();
526 let set_metadata = update.metadata_json.is_some();
530 let set_analytics = update.analytics_json.is_some();
531 let row = client
532 .query_one(
533 "UPDATE conversations SET
534 name = COALESCE($2, name),
535 metadata_json = CASE WHEN $3 THEN $4 ELSE metadata_json END,
536 analytics_json = CASE WHEN $5 THEN $6 ELSE analytics_json END,
537 updated_at = $7
538 WHERE id = $1
539 RETURNING *",
540 &[
541 &id,
542 &update.name,
543 &set_metadata,
544 &update.metadata_json,
545 &set_analytics,
546 &update.analytics_json,
547 &now,
548 ],
549 )
550 .await
551 .with_context(|| format!("conversation '{id}' not found"))?;
552 row_to_conversation(&row)
553 }
554
555 async fn add_participant(&self, participant: Participant) -> Result<Participant> {
558 let client = self.pool.get().await?;
559 client
560 .execute(
561 "INSERT INTO conversation_participants
562 (id, conversation_id, organization_id, type, external_id, internal_id,
563 browser_fingerprint, browser_info, name, email, phone, crm_contact_id,
564 metadata_json, created_at, updated_at)
565 VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11,$12,$13,$14,$15)",
566 &[
567 &participant.id,
568 &participant.conversation_id,
569 &participant.organization_id,
570 &participant_type_to_str(participant.participant_type),
571 &participant.external_id,
572 &participant.internal_id,
573 &participant.browser_fingerprint,
574 &participant.browser_info,
575 &participant.name,
576 &participant.email,
577 &participant.phone,
578 &participant.crm_contact_id,
579 &participant.metadata_json,
580 &participant.created_at,
581 &participant.updated_at,
582 ],
583 )
584 .await?;
585 Ok(participant)
586 }
587
588 async fn get_participant(&self, id: &str) -> Result<Option<Participant>> {
589 let client = self.pool.get().await?;
590 let row = client
591 .query_opt(
592 "SELECT * FROM conversation_participants WHERE id = $1",
593 &[&id],
594 )
595 .await?;
596 row.as_ref().map(row_to_participant).transpose()
597 }
598
599 async fn list_participants_by_conversation(
600 &self,
601 conversation_id: &str,
602 ) -> Result<Vec<Participant>> {
603 let client = self.pool.get().await?;
604 let rows = client
605 .query(
606 "SELECT * FROM conversation_participants WHERE conversation_id = $1 ORDER BY created_at, id",
607 &[&conversation_id],
608 )
609 .await?;
610 rows.iter().map(row_to_participant).collect()
611 }
612
613 async fn resolve_participant_by_external_id(
614 &self,
615 conversation_id: &str,
616 external_id: &str,
617 ) -> Result<Option<Participant>> {
618 let client = self.pool.get().await?;
619 let row = client
620 .query_opt(
621 "SELECT * FROM conversation_participants
622 WHERE conversation_id = $1 AND external_id = $2
623 ORDER BY created_at LIMIT 1",
624 &[&conversation_id, &external_id],
625 )
626 .await?;
627 row.as_ref().map(row_to_participant).transpose()
628 }
629
630 async fn append_message(&self, message: Message) -> Result<Message> {
633 let client = self.pool.get().await?;
634 let content = serde_json::to_value(&message.content)?;
635 let from = message
636 .from
637 .as_ref()
638 .map(serde_json::to_value)
639 .transpose()?;
640 let to = message.to.as_ref().map(serde_json::to_value).transpose()?;
641 client
642 .execute(
643 "INSERT INTO conversation_messages
644 (id, external_id, organization_id, conversation_id, direction, content,
645 from_ref, to_ref, metadata_json, analytics_json, created_at, updated_at)
646 VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11,$12)",
647 &[
648 &message.id,
649 &message.external_id,
650 &message.organization_id,
651 &message.conversation_id,
652 &direction_to_str(message.direction),
653 &content,
654 &from,
655 &to,
656 &message.metadata_json,
657 &message.analytics_json,
658 &message.created_at,
659 &message.updated_at,
660 ],
661 )
662 .await?;
663 Ok(message)
664 }
665
666 async fn get_message(&self, id: &str) -> Result<Option<Message>> {
667 let client = self.pool.get().await?;
668 let row = client
669 .query_opt("SELECT * FROM conversation_messages WHERE id = $1", &[&id])
670 .await?;
671 row.as_ref().map(row_to_message).transpose()
672 }
673
674 async fn list_messages_by_conversation(&self, query: MessageQuery) -> Result<MessagePage> {
675 let client = self.pool.get().await?;
676 let limit_i64 = i64::try_from(query.limit).unwrap_or(i64::MAX);
677
678 let cursor_seq: Option<i64> = match &query.cursor {
681 Some(cursor) => {
682 let row = client
683 .query_opt(
684 "SELECT seq FROM conversation_messages WHERE id = $1",
685 &[&cursor],
686 )
687 .await?;
688 row.map(|r| r.get::<_, i64>("seq"))
689 }
690 None => None,
691 };
692
693 let probe = limit_i64.saturating_add(1);
696 let rows = if query.descending {
697 match cursor_seq {
699 Some(seq) => {
700 client
701 .query(
702 "SELECT * FROM conversation_messages
703 WHERE conversation_id = $1 AND seq < $2
704 ORDER BY seq DESC LIMIT $3",
705 &[&query.conversation_id, &seq, &probe],
706 )
707 .await?
708 }
709 None => {
710 client
711 .query(
712 "SELECT * FROM conversation_messages
713 WHERE conversation_id = $1
714 ORDER BY seq DESC LIMIT $2",
715 &[&query.conversation_id, &probe],
716 )
717 .await?
718 }
719 }
720 } else {
721 match cursor_seq {
723 Some(seq) => {
724 client
725 .query(
726 "SELECT * FROM conversation_messages
727 WHERE conversation_id = $1 AND seq > $2
728 ORDER BY seq ASC LIMIT $3",
729 &[&query.conversation_id, &seq, &probe],
730 )
731 .await?
732 }
733 None => {
734 client
735 .query(
736 "SELECT * FROM conversation_messages
737 WHERE conversation_id = $1
738 ORDER BY seq ASC LIMIT $2",
739 &[&query.conversation_id, &probe],
740 )
741 .await?
742 }
743 }
744 };
745
746 let has_more = rows.len() as i64 > limit_i64;
747 let page_rows = if has_more {
748 &rows[..query.limit]
749 } else {
750 &rows[..]
751 };
752 let messages: Vec<Message> = page_rows
753 .iter()
754 .map(row_to_message)
755 .collect::<Result<_>>()?;
756 let next_cursor = if has_more {
757 messages.last().map(|m| m.id.clone())
758 } else {
759 None
760 };
761
762 Ok(MessagePage {
763 messages,
764 next_cursor,
765 })
766 }
767
768 async fn create_session(&self, session: Session) -> Result<Session> {
771 let client = self.pool.get().await?;
772 let status = session.status.map(session_status_to_str);
773 let token_count = session
774 .token_count
775 .map(|v| i64::try_from(v).unwrap_or(i64::MAX));
776 let message_count = session
777 .message_count
778 .map(|v| i64::try_from(v).unwrap_or(i64::MAX));
779 let metadata = session
780 .metadata
781 .as_ref()
782 .map(serde_json::to_value)
783 .transpose()?;
784 client
785 .execute(
786 "INSERT INTO conversation_sessions
787 (session_id, conversation_id, organization_id, agent_id, agent_name,
788 user_participant_id, agent_participant_id, thread_id, status, token_count,
789 message_count, metadata, created_at, updated_at, ended_at, last_activity_at)
790 VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11,$12,$13,$14,$15,$16)",
791 &[
792 &session.session_id,
793 &session.conversation_id,
794 &session.organization_id,
795 &session.agent_id,
796 &session.agent_name,
797 &session.user_participant_id,
798 &session.agent_participant_id,
799 &session.thread_id,
800 &status,
801 &token_count,
802 &message_count,
803 &metadata,
804 &session.created_at,
805 &session.updated_at,
806 &session.ended_at,
807 &session.last_activity_at,
808 ],
809 )
810 .await?;
811 Ok(session)
812 }
813
814 async fn get_session(&self, session_id: &str) -> Result<Option<Session>> {
815 let client = self.pool.get().await?;
816 let row = client
817 .query_opt(
818 "SELECT * FROM conversation_sessions WHERE session_id = $1",
819 &[&session_id],
820 )
821 .await?;
822 row.as_ref().map(row_to_session).transpose()
823 }
824
825 async fn update_session(&self, session_id: &str, update: SessionUpdate) -> Result<Session> {
826 let client = self.pool.get().await?;
827 let now = Utc::now();
828 let status = update.status.map(session_status_to_str);
829 let token_count = update
830 .token_count
831 .map(|v| i64::try_from(v).unwrap_or(i64::MAX));
832 let message_count = update
833 .message_count
834 .map(|v| i64::try_from(v).unwrap_or(i64::MAX));
835 let set_last_activity = update.last_activity_at.is_some();
837 let set_ended = update.ended_at.is_some();
838 let row = client
839 .query_one(
840 "UPDATE conversation_sessions SET
841 status = COALESCE($2, status),
842 token_count = COALESCE($3, token_count),
843 message_count = COALESCE($4, message_count),
844 last_activity_at = CASE WHEN $5 THEN $6 ELSE last_activity_at END,
845 ended_at = CASE WHEN $7 THEN $8 ELSE ended_at END,
846 updated_at = $9
847 WHERE session_id = $1
848 RETURNING *",
849 &[
850 &session_id,
851 &status,
852 &token_count,
853 &message_count,
854 &set_last_activity,
855 &update.last_activity_at,
856 &set_ended,
857 &update.ended_at,
858 &now,
859 ],
860 )
861 .await
862 .with_context(|| format!("session '{session_id}' not found"))?;
863 row_to_session(&row)
864 }
865
866 async fn list_sessions_by_conversation(&self, conversation_id: &str) -> Result<Vec<Session>> {
867 let client = self.pool.get().await?;
868 let rows = client
869 .query(
870 "SELECT * FROM conversation_sessions WHERE conversation_id = $1 ORDER BY created_at, session_id",
871 &[&conversation_id],
872 )
873 .await?;
874 rows.iter().map(row_to_session).collect()
875 }
876
877 fn checkpoints(&self) -> Arc<dyn CheckpointStore> {
880 self.checkpoints
882 .as_ref()
883 .expect("checkpoint store present")
884 .clone()
885 }
886
887 fn knowledge(&self) -> Arc<dyn KnowledgeBase> {
888 self.knowledge.clone()
889 }
890
891 fn knowledge_for_access(&self, access: &AccessContext) -> Arc<dyn KnowledgeBase> {
892 Arc::new(self.knowledge.with_access(access.clone()))
898 }
899}