1mod admin;
39mod embedder;
40mod knowledge;
41mod memory;
42mod reranker;
43mod schema;
44
45use std::sync::Arc;
46
47use anyhow::{anyhow, Context, Result};
48use async_trait::async_trait;
49use chrono::Utc;
50use deadpool_postgres::{Config as PoolConfig, ManagerConfig, Pool, RecyclingMethod, Runtime};
51use tokio_postgres::NoTls;
52
53use smooth_operator::access_control::AccessContext;
54use smooth_operator::adapter::{
55 ConversationUpdate, MessagePage, MessageQuery, SessionUpdate, StorageAdapter,
56};
57use smooth_operator::domain::{
58 Conversation, Direction, Message, MessageContent, Participant, ParticipantRef, ParticipantType,
59 Platform, Session, SessionStatus,
60};
61use smooth_operator_core::checkpoint::PostgresCheckpointStore;
62use smooth_operator_core::{CheckpointStore, KnowledgeBase};
63
64pub use admin::{PgConnectorConfigStore, PgIndexingStore, PgSettingsStore};
69pub use embedder::{GatewayEmbedder, OPENAI_SMALL_EMBEDDING_DIM};
70pub use knowledge::PgKnowledgeBase;
71pub use memory::PgMemory;
72pub use reranker::{
73 GatewayReranker, HttpRerankBackend, RerankBackend, RerankScore, DEFAULT_RERANK_MODEL,
74};
75pub use smooth_operator::embedding::{
76 DeterministicEmbedder, Embedder, InputType, DEFAULT_EMBEDDING_DIM,
77};
78
79pub struct PostgresAdapter {
81 pool: Pool,
82 checkpoints: Option<Arc<PostgresCheckpointStore>>,
88 knowledge: Arc<PgKnowledgeBase>,
89 embedder: Arc<dyn Embedder>,
92 embedding_dim: usize,
93 handle: tokio::runtime::Handle,
96}
97
98impl Drop for PostgresAdapter {
99 fn drop(&mut self) {
100 if let Some(checkpoints) = self.checkpoints.take() {
101 if let Ok(handle) = std::thread::Builder::new()
107 .name("pg-checkpoint-drop".into())
108 .spawn(move || drop(checkpoints))
109 {
110 let _ = handle.join();
111 }
112 }
113 }
114}
115
116impl PostgresAdapter {
117 pub async fn connect(conn_str: &str) -> Result<Self> {
127 Self::connect_with_embedder(conn_str, Arc::new(DeterministicEmbedder::new())).await
128 }
129
130 pub async fn connect_with_embedder(
137 conn_str: &str,
138 embedder: Arc<dyn Embedder>,
139 ) -> Result<Self> {
140 let embedding_dim = embedder.dim();
141
142 let pg_config: tokio_postgres::Config = conn_str
144 .parse()
145 .context("parsing connection string for async pool")?;
146 let mut cfg = PoolConfig::new();
147 cfg.manager = Some(ManagerConfig {
148 recycling_method: RecyclingMethod::Fast,
149 });
150 cfg.dbname = pg_config.get_dbname().map(str::to_string);
152 cfg.user = pg_config.get_user().map(str::to_string);
153 cfg.password = pg_config
154 .get_password()
155 .map(|p| String::from_utf8_lossy(p).into_owned());
156 if let Some(host) = pg_config.get_hosts().iter().find_map(|h| match h {
157 tokio_postgres::config::Host::Tcp(t) => Some(t.clone()),
158 tokio_postgres::config::Host::Unix(p) => p.to_str().map(str::to_string),
159 }) {
160 cfg.host = Some(host);
161 }
162 if let Some(port) = pg_config.get_ports().first().copied() {
163 cfg.port = Some(port);
164 }
165 let pool = cfg
166 .create_pool(Some(Runtime::Tokio1), NoTls)
167 .context("building deadpool")?;
168
169 {
171 let client = pool
172 .get()
173 .await
174 .context("acquiring connection for migration")?;
175 client
176 .batch_execute(schema::OLTP_SCHEMA)
177 .await
178 .context("applying OLTP schema")?;
179 client
180 .batch_execute(schema::ADMIN_SCHEMA)
181 .await
182 .context("applying admin schema")?;
183 client
184 .batch_execute(schema::VECTOR_EXTENSION)
185 .await
186 .context("creating pgvector extension")?;
187 client
188 .batch_execute(&schema::knowledge_vectors_schema(embedding_dim))
189 .await
190 .context("applying knowledge_vectors schema")?;
191 client
192 .batch_execute(&schema::memories_schema(embedding_dim))
193 .await
194 .context("applying memories schema")?;
195 }
196
197 let cs_conn = conn_str.to_string();
201 let checkpoints =
202 tokio::task::spawn_blocking(move || PostgresCheckpointStore::connect(&cs_conn))
203 .await
204 .context("checkpoint store setup task panicked")?
205 .context("constructing PostgresCheckpointStore")?;
206 let checkpoints = Arc::new(checkpoints);
207
208 let handle = tokio::runtime::Handle::current();
210 let knowledge = Arc::new(PgKnowledgeBase::new(
211 pool.clone(),
212 embedder.clone(),
213 handle.clone(),
214 None,
215 ));
216
217 Ok(Self {
218 pool,
219 checkpoints: Some(checkpoints),
220 knowledge,
221 embedder,
222 embedding_dim,
223 handle,
224 })
225 }
226
227 pub async fn from_env() -> Result<Self> {
233 let conn_str = std::env::var("SMOOTH_AGENT_DATABASE_URL")
234 .or_else(|_| std::env::var("DATABASE_URL"))
235 .map_err(|_| anyhow!("neither SMOOTH_AGENT_DATABASE_URL nor DATABASE_URL is set"))?;
236 Self::connect(&conn_str).await
237 }
238
239 #[must_use]
241 pub fn embedding_dim(&self) -> usize {
242 self.embedding_dim
243 }
244
245 #[must_use]
249 pub fn connector_config_store(&self) -> PgConnectorConfigStore {
250 PgConnectorConfigStore::new(self.pool.clone(), self.handle.clone())
251 }
252
253 #[must_use]
256 pub fn settings_store(&self) -> PgSettingsStore {
257 PgSettingsStore::new(self.pool.clone(), self.handle.clone())
258 }
259
260 #[must_use]
263 pub fn indexing_store(&self) -> PgIndexingStore {
264 PgIndexingStore::new(self.pool.clone(), self.handle.clone())
265 }
266
267 #[must_use]
277 pub fn memory(&self, organization_id: impl Into<String>, user_id: Option<String>) -> PgMemory {
278 PgMemory::new(
279 self.pool.clone(),
280 self.embedder.clone(),
281 self.handle.clone(),
282 organization_id,
283 user_id,
284 )
285 }
286}
287
288fn platform_to_str(p: Platform) -> &'static str {
291 match p {
292 Platform::Web => "web",
293 Platform::Messenger => "messenger",
294 Platform::Instagram => "instagram",
295 Platform::Email => "email",
296 Platform::Discord => "discord",
297 Platform::Phone => "phone",
298 Platform::Sms => "sms",
299 Platform::Slack => "slack",
300 Platform::Whatsapp => "whatsapp",
301 Platform::Tiktok => "tiktok",
302 }
303}
304
305fn platform_from_str(s: &str) -> Result<Platform> {
306 Ok(match s {
307 "web" => Platform::Web,
308 "messenger" => Platform::Messenger,
309 "instagram" => Platform::Instagram,
310 "email" => Platform::Email,
311 "discord" => Platform::Discord,
312 "phone" => Platform::Phone,
313 "sms" => Platform::Sms,
314 "slack" => Platform::Slack,
315 "whatsapp" => Platform::Whatsapp,
316 "tiktok" => Platform::Tiktok,
317 other => return Err(anyhow!("unknown platform '{other}'")),
318 })
319}
320
321fn participant_type_to_str(t: ParticipantType) -> &'static str {
322 match t {
323 ParticipantType::User => "user",
324 ParticipantType::AiAgent => "ai-agent",
325 ParticipantType::HumanAgent => "human-agent",
326 }
327}
328
329fn participant_type_from_str(s: &str) -> Result<ParticipantType> {
330 Ok(match s {
331 "user" => ParticipantType::User,
332 "ai-agent" => ParticipantType::AiAgent,
333 "human-agent" => ParticipantType::HumanAgent,
334 other => return Err(anyhow!("unknown participant type '{other}'")),
335 })
336}
337
338fn direction_to_str(d: Direction) -> &'static str {
339 match d {
340 Direction::Inbound => "inbound",
341 Direction::Outbound => "outbound",
342 }
343}
344
345fn direction_from_str(s: &str) -> Result<Direction> {
346 Ok(match s {
347 "inbound" => Direction::Inbound,
348 "outbound" => Direction::Outbound,
349 other => return Err(anyhow!("unknown direction '{other}'")),
350 })
351}
352
353fn session_status_to_str(s: SessionStatus) -> &'static str {
354 match s {
355 SessionStatus::Active => "active",
356 SessionStatus::Idle => "idle",
357 SessionStatus::Ended => "ended",
358 }
359}
360
361fn session_status_from_str(s: &str) -> Result<SessionStatus> {
362 Ok(match s {
363 "active" => SessionStatus::Active,
364 "idle" => SessionStatus::Idle,
365 "ended" => SessionStatus::Ended,
366 other => return Err(anyhow!("unknown session status '{other}'")),
367 })
368}
369
370fn row_to_conversation(row: &tokio_postgres::Row) -> Result<Conversation> {
371 Ok(Conversation {
372 id: row.get("id"),
373 platform: platform_from_str(row.get::<_, String>("platform").as_str())?,
374 name: row.get("name"),
375 organization_id: row.get("organization_id"),
376 idempotency_key: row.get("idempotency_key"),
377 metadata_json: row.get("metadata_json"),
378 analytics_json: row.get("analytics_json"),
379 created_at: row.get("created_at"),
380 updated_at: row.get("updated_at"),
381 })
382}
383
384fn row_to_participant(row: &tokio_postgres::Row) -> Result<Participant> {
385 Ok(Participant {
386 id: row.get("id"),
387 conversation_id: row.get("conversation_id"),
388 organization_id: row.get("organization_id"),
389 participant_type: participant_type_from_str(row.get::<_, String>("type").as_str())?,
390 external_id: row.get("external_id"),
391 internal_id: row.get("internal_id"),
392 browser_fingerprint: row.get("browser_fingerprint"),
393 browser_info: row.get("browser_info"),
394 name: row.get("name"),
395 email: row.get("email"),
396 phone: row.get("phone"),
397 crm_contact_id: row.get("crm_contact_id"),
398 metadata_json: row.get("metadata_json"),
399 created_at: row.get("created_at"),
400 updated_at: row.get("updated_at"),
401 })
402}
403
404fn row_to_message(row: &tokio_postgres::Row) -> Result<Message> {
405 let content: serde_json::Value = row.get("content");
406 let content: MessageContent =
407 serde_json::from_value(content).context("decoding message content")?;
408 let from: Option<serde_json::Value> = row.get("from_ref");
409 let to: Option<serde_json::Value> = row.get("to_ref");
410 let from: Option<ParticipantRef> = from
411 .map(serde_json::from_value)
412 .transpose()
413 .context("decoding from_ref")?;
414 let to: Option<ParticipantRef> = to
415 .map(serde_json::from_value)
416 .transpose()
417 .context("decoding to_ref")?;
418 Ok(Message {
419 id: row.get("id"),
420 external_id: row.get("external_id"),
421 organization_id: row.get("organization_id"),
422 conversation_id: row.get("conversation_id"),
423 direction: direction_from_str(row.get::<_, String>("direction").as_str())?,
424 content,
425 from,
426 to,
427 metadata_json: row.get("metadata_json"),
428 analytics_json: row.get("analytics_json"),
429 created_at: row.get("created_at"),
430 updated_at: row.get("updated_at"),
431 })
432}
433
434fn row_to_session(row: &tokio_postgres::Row) -> Result<Session> {
435 let status: Option<String> = row.get("status");
436 let status = status.map(|s| session_status_from_str(&s)).transpose()?;
437 let token_count: Option<i64> = row.get("token_count");
438 let message_count: Option<i64> = row.get("message_count");
439 let metadata: Option<serde_json::Value> = row.get("metadata");
440 let metadata = metadata
441 .map(serde_json::from_value)
442 .transpose()
443 .context("decoding session metadata")?;
444 Ok(Session {
445 session_id: row.get("session_id"),
446 conversation_id: row.get("conversation_id"),
447 agent_id: row.get("agent_id"),
448 agent_name: row.get("agent_name"),
449 user_participant_id: row.get("user_participant_id"),
450 agent_participant_id: row.get("agent_participant_id"),
451 thread_id: row.get("thread_id"),
452 status,
453 token_count: token_count.map(|v| u64::try_from(v).unwrap_or(0)),
454 message_count: message_count.map(|v| u64::try_from(v).unwrap_or(0)),
455 metadata,
456 created_at: row.get("created_at"),
457 updated_at: row.get("updated_at"),
458 ended_at: row.get("ended_at"),
459 last_activity_at: row.get("last_activity_at"),
460 })
461}
462
463#[async_trait]
464impl StorageAdapter for PostgresAdapter {
465 async fn create_conversation(&self, conversation: Conversation) -> Result<Conversation> {
468 let client = self.pool.get().await?;
469 client
472 .execute(
473 "INSERT INTO conversations
474 (id, platform, name, organization_id, idempotency_key, metadata_json, analytics_json, created_at, updated_at)
475 VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)
476 ON CONFLICT (organization_id, idempotency_key) DO NOTHING",
477 &[
478 &conversation.id,
479 &platform_to_str(conversation.platform),
480 &conversation.name,
481 &conversation.organization_id,
482 &conversation.idempotency_key,
483 &conversation.metadata_json,
484 &conversation.analytics_json,
485 &conversation.created_at,
486 &conversation.updated_at,
487 ],
488 )
489 .await?;
490 let row = client
491 .query_one(
492 "SELECT * FROM conversations WHERE organization_id = $1 AND idempotency_key = $2",
493 &[&conversation.organization_id, &conversation.idempotency_key],
494 )
495 .await?;
496 row_to_conversation(&row)
497 }
498
499 async fn get_conversation(&self, id: &str) -> Result<Option<Conversation>> {
500 let client = self.pool.get().await?;
501 let row = client
502 .query_opt("SELECT * FROM conversations WHERE id = $1", &[&id])
503 .await?;
504 row.as_ref().map(row_to_conversation).transpose()
505 }
506
507 async fn list_conversations_by_org(&self, organization_id: &str) -> Result<Vec<Conversation>> {
508 let client = self.pool.get().await?;
509 let rows = client
510 .query(
511 "SELECT * FROM conversations WHERE organization_id = $1 ORDER BY created_at DESC",
512 &[&organization_id],
513 )
514 .await?;
515 rows.iter().map(row_to_conversation).collect()
516 }
517
518 async fn update_conversation(
519 &self,
520 id: &str,
521 update: ConversationUpdate,
522 ) -> Result<Conversation> {
523 let client = self.pool.get().await?;
524 let now = Utc::now();
525 let set_metadata = update.metadata_json.is_some();
529 let set_analytics = update.analytics_json.is_some();
530 let row = client
531 .query_one(
532 "UPDATE conversations SET
533 name = COALESCE($2, name),
534 metadata_json = CASE WHEN $3 THEN $4 ELSE metadata_json END,
535 analytics_json = CASE WHEN $5 THEN $6 ELSE analytics_json END,
536 updated_at = $7
537 WHERE id = $1
538 RETURNING *",
539 &[
540 &id,
541 &update.name,
542 &set_metadata,
543 &update.metadata_json,
544 &set_analytics,
545 &update.analytics_json,
546 &now,
547 ],
548 )
549 .await
550 .with_context(|| format!("conversation '{id}' not found"))?;
551 row_to_conversation(&row)
552 }
553
554 async fn add_participant(&self, participant: Participant) -> Result<Participant> {
557 let client = self.pool.get().await?;
558 client
559 .execute(
560 "INSERT INTO conversation_participants
561 (id, conversation_id, organization_id, type, external_id, internal_id,
562 browser_fingerprint, browser_info, name, email, phone, crm_contact_id,
563 metadata_json, created_at, updated_at)
564 VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11,$12,$13,$14,$15)",
565 &[
566 &participant.id,
567 &participant.conversation_id,
568 &participant.organization_id,
569 &participant_type_to_str(participant.participant_type),
570 &participant.external_id,
571 &participant.internal_id,
572 &participant.browser_fingerprint,
573 &participant.browser_info,
574 &participant.name,
575 &participant.email,
576 &participant.phone,
577 &participant.crm_contact_id,
578 &participant.metadata_json,
579 &participant.created_at,
580 &participant.updated_at,
581 ],
582 )
583 .await?;
584 Ok(participant)
585 }
586
587 async fn get_participant(&self, id: &str) -> Result<Option<Participant>> {
588 let client = self.pool.get().await?;
589 let row = client
590 .query_opt(
591 "SELECT * FROM conversation_participants WHERE id = $1",
592 &[&id],
593 )
594 .await?;
595 row.as_ref().map(row_to_participant).transpose()
596 }
597
598 async fn list_participants_by_conversation(
599 &self,
600 conversation_id: &str,
601 ) -> Result<Vec<Participant>> {
602 let client = self.pool.get().await?;
603 let rows = client
604 .query(
605 "SELECT * FROM conversation_participants WHERE conversation_id = $1 ORDER BY created_at, id",
606 &[&conversation_id],
607 )
608 .await?;
609 rows.iter().map(row_to_participant).collect()
610 }
611
612 async fn resolve_participant_by_external_id(
613 &self,
614 conversation_id: &str,
615 external_id: &str,
616 ) -> Result<Option<Participant>> {
617 let client = self.pool.get().await?;
618 let row = client
619 .query_opt(
620 "SELECT * FROM conversation_participants
621 WHERE conversation_id = $1 AND external_id = $2
622 ORDER BY created_at LIMIT 1",
623 &[&conversation_id, &external_id],
624 )
625 .await?;
626 row.as_ref().map(row_to_participant).transpose()
627 }
628
629 async fn append_message(&self, message: Message) -> Result<Message> {
632 let client = self.pool.get().await?;
633 let content = serde_json::to_value(&message.content)?;
634 let from = message
635 .from
636 .as_ref()
637 .map(serde_json::to_value)
638 .transpose()?;
639 let to = message.to.as_ref().map(serde_json::to_value).transpose()?;
640 client
641 .execute(
642 "INSERT INTO conversation_messages
643 (id, external_id, organization_id, conversation_id, direction, content,
644 from_ref, to_ref, metadata_json, analytics_json, created_at, updated_at)
645 VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11,$12)",
646 &[
647 &message.id,
648 &message.external_id,
649 &message.organization_id,
650 &message.conversation_id,
651 &direction_to_str(message.direction),
652 &content,
653 &from,
654 &to,
655 &message.metadata_json,
656 &message.analytics_json,
657 &message.created_at,
658 &message.updated_at,
659 ],
660 )
661 .await?;
662 Ok(message)
663 }
664
665 async fn get_message(&self, id: &str) -> Result<Option<Message>> {
666 let client = self.pool.get().await?;
667 let row = client
668 .query_opt("SELECT * FROM conversation_messages WHERE id = $1", &[&id])
669 .await?;
670 row.as_ref().map(row_to_message).transpose()
671 }
672
673 async fn list_messages_by_conversation(&self, query: MessageQuery) -> Result<MessagePage> {
674 let client = self.pool.get().await?;
675 let limit_i64 = i64::try_from(query.limit).unwrap_or(i64::MAX);
676
677 let cursor_seq: Option<i64> = match &query.cursor {
680 Some(cursor) => {
681 let row = client
682 .query_opt(
683 "SELECT seq FROM conversation_messages WHERE id = $1",
684 &[&cursor],
685 )
686 .await?;
687 row.map(|r| r.get::<_, i64>("seq"))
688 }
689 None => None,
690 };
691
692 let probe = limit_i64.saturating_add(1);
695 let rows = if query.descending {
696 match cursor_seq {
698 Some(seq) => {
699 client
700 .query(
701 "SELECT * FROM conversation_messages
702 WHERE conversation_id = $1 AND seq < $2
703 ORDER BY seq DESC LIMIT $3",
704 &[&query.conversation_id, &seq, &probe],
705 )
706 .await?
707 }
708 None => {
709 client
710 .query(
711 "SELECT * FROM conversation_messages
712 WHERE conversation_id = $1
713 ORDER BY seq DESC LIMIT $2",
714 &[&query.conversation_id, &probe],
715 )
716 .await?
717 }
718 }
719 } else {
720 match cursor_seq {
722 Some(seq) => {
723 client
724 .query(
725 "SELECT * FROM conversation_messages
726 WHERE conversation_id = $1 AND seq > $2
727 ORDER BY seq ASC LIMIT $3",
728 &[&query.conversation_id, &seq, &probe],
729 )
730 .await?
731 }
732 None => {
733 client
734 .query(
735 "SELECT * FROM conversation_messages
736 WHERE conversation_id = $1
737 ORDER BY seq ASC LIMIT $2",
738 &[&query.conversation_id, &probe],
739 )
740 .await?
741 }
742 }
743 };
744
745 let has_more = rows.len() as i64 > limit_i64;
746 let page_rows = if has_more {
747 &rows[..query.limit]
748 } else {
749 &rows[..]
750 };
751 let messages: Vec<Message> = page_rows
752 .iter()
753 .map(row_to_message)
754 .collect::<Result<_>>()?;
755 let next_cursor = if has_more {
756 messages.last().map(|m| m.id.clone())
757 } else {
758 None
759 };
760
761 Ok(MessagePage {
762 messages,
763 next_cursor,
764 })
765 }
766
767 async fn create_session(&self, session: Session) -> Result<Session> {
770 let client = self.pool.get().await?;
771 let status = session.status.map(session_status_to_str);
772 let token_count = session
773 .token_count
774 .map(|v| i64::try_from(v).unwrap_or(i64::MAX));
775 let message_count = session
776 .message_count
777 .map(|v| i64::try_from(v).unwrap_or(i64::MAX));
778 let metadata = session
779 .metadata
780 .as_ref()
781 .map(serde_json::to_value)
782 .transpose()?;
783 client
784 .execute(
785 "INSERT INTO conversation_sessions
786 (session_id, conversation_id, agent_id, agent_name, user_participant_id,
787 agent_participant_id, thread_id, status, token_count, message_count,
788 metadata, created_at, updated_at, ended_at, last_activity_at)
789 VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11,$12,$13,$14,$15)",
790 &[
791 &session.session_id,
792 &session.conversation_id,
793 &session.agent_id,
794 &session.agent_name,
795 &session.user_participant_id,
796 &session.agent_participant_id,
797 &session.thread_id,
798 &status,
799 &token_count,
800 &message_count,
801 &metadata,
802 &session.created_at,
803 &session.updated_at,
804 &session.ended_at,
805 &session.last_activity_at,
806 ],
807 )
808 .await?;
809 Ok(session)
810 }
811
812 async fn get_session(&self, session_id: &str) -> Result<Option<Session>> {
813 let client = self.pool.get().await?;
814 let row = client
815 .query_opt(
816 "SELECT * FROM conversation_sessions WHERE session_id = $1",
817 &[&session_id],
818 )
819 .await?;
820 row.as_ref().map(row_to_session).transpose()
821 }
822
823 async fn update_session(&self, session_id: &str, update: SessionUpdate) -> Result<Session> {
824 let client = self.pool.get().await?;
825 let now = Utc::now();
826 let status = update.status.map(session_status_to_str);
827 let token_count = update
828 .token_count
829 .map(|v| i64::try_from(v).unwrap_or(i64::MAX));
830 let message_count = update
831 .message_count
832 .map(|v| i64::try_from(v).unwrap_or(i64::MAX));
833 let set_last_activity = update.last_activity_at.is_some();
835 let set_ended = update.ended_at.is_some();
836 let row = client
837 .query_one(
838 "UPDATE conversation_sessions SET
839 status = COALESCE($2, status),
840 token_count = COALESCE($3, token_count),
841 message_count = COALESCE($4, message_count),
842 last_activity_at = CASE WHEN $5 THEN $6 ELSE last_activity_at END,
843 ended_at = CASE WHEN $7 THEN $8 ELSE ended_at END,
844 updated_at = $9
845 WHERE session_id = $1
846 RETURNING *",
847 &[
848 &session_id,
849 &status,
850 &token_count,
851 &message_count,
852 &set_last_activity,
853 &update.last_activity_at,
854 &set_ended,
855 &update.ended_at,
856 &now,
857 ],
858 )
859 .await
860 .with_context(|| format!("session '{session_id}' not found"))?;
861 row_to_session(&row)
862 }
863
864 async fn list_sessions_by_conversation(&self, conversation_id: &str) -> Result<Vec<Session>> {
865 let client = self.pool.get().await?;
866 let rows = client
867 .query(
868 "SELECT * FROM conversation_sessions WHERE conversation_id = $1 ORDER BY created_at, session_id",
869 &[&conversation_id],
870 )
871 .await?;
872 rows.iter().map(row_to_session).collect()
873 }
874
875 fn checkpoints(&self) -> Arc<dyn CheckpointStore> {
878 self.checkpoints
880 .as_ref()
881 .expect("checkpoint store present")
882 .clone()
883 }
884
885 fn knowledge(&self) -> Arc<dyn KnowledgeBase> {
886 self.knowledge.clone()
887 }
888
889 fn knowledge_for_access(&self, access: &AccessContext) -> Arc<dyn KnowledgeBase> {
890 Arc::new(self.knowledge.with_access(access.clone()))
896 }
897}