1use async_trait::async_trait;
4use zoey_core::observability::types::LLMCostRecord;
5use zoey_core::{types::*, ZoeyError, Result};
6use sqlx::{postgres::PgPoolOptions, Arguments, PgPool, Row};
7use tracing::{debug, info, warn};
8
9const ALLOWED_TABLES: &[&str] = &[
11 "memories",
12 "agents",
13 "entities",
14 "worlds",
15 "rooms",
16 "relationships",
17 "goals",
18 "logs",
19 "cache",
20 "components",
21 "embeddings",
22 "documents",
23 "conversations",
24];
25
26fn validate_table_name(name: &str) -> Result<&str> {
33 if name.len() > 64 {
35 warn!("Rejected table name due to length: {} chars", name.len());
36 return Err(ZoeyError::validation(format!(
37 "Table name too long: {} (max 64 characters)",
38 name.len()
39 )));
40 }
41
42 if !name.chars().all(|c| c.is_ascii_alphanumeric() || c == '_') {
44 warn!("Rejected table name with invalid characters: {}", name);
45 return Err(ZoeyError::validation(format!(
46 "Invalid table name '{}': only alphanumeric characters and underscores allowed",
47 name
48 )));
49 }
50
51 if !ALLOWED_TABLES.contains(&name) {
53 warn!("Rejected unknown table name: {}", name);
54 return Err(ZoeyError::validation(format!(
55 "Unknown table name '{}': not in allowed list",
56 name
57 )));
58 }
59
60 Ok(name)
61}
62
63fn validate_identifier(name: &str) -> Result<&str> {
65 if name.len() > 64 {
66 warn!("Rejected identifier due to length: {} chars", name.len());
67 return Err(ZoeyError::validation(format!(
68 "Identifier too long: {} (max 64 characters)",
69 name.len()
70 )));
71 }
72 if !name.chars().all(|c| c.is_ascii_alphanumeric() || c == '_') {
73 warn!("Rejected identifier with invalid characters: {}", name);
74 return Err(ZoeyError::validation(format!(
75 "Invalid identifier '{}': only alphanumeric characters and underscores allowed",
76 name
77 )));
78 }
79 Ok(name)
80}
81
82pub struct PostgresAdapter {
84 pool: PgPool,
85 embedding_dimension: std::sync::RwLock<usize>,
86}
87
88impl PostgresAdapter {
89 pub async fn set_current_agent(&self, agent_id: uuid::Uuid) -> Result<()> {
91 sqlx::query("SELECT set_config('app.current_agent_id', $1, false)")
92 .bind(agent_id.to_string())
93 .execute(&self.pool)
94 .await?;
95 Ok(())
96 }
97 pub async fn new(database_url: &str) -> Result<Self> {
99 info!("Connecting to PostgreSQL database...");
100
101 let pool = PgPoolOptions::new()
102 .max_connections(10)
103 .connect(database_url)
104 .await
105 .map_err(|e| ZoeyError::DatabaseSqlx(e))?;
106
107 Ok(Self {
108 pool,
109 embedding_dimension: std::sync::RwLock::new(1536), })
111 }
112
113 pub async fn with_pool(pool: PgPool) -> Self {
115 Self {
116 pool,
117 embedding_dimension: std::sync::RwLock::new(1536),
118 }
119 }
120
121 async fn init_schema(&self) -> Result<()> {
123 debug!("Initializing database schema...");
124
125 sqlx::query("CREATE EXTENSION IF NOT EXISTS vector")
127 .execute(&self.pool)
128 .await
129 .ok(); sqlx::query(
138 r#"
139 CREATE TABLE IF NOT EXISTS agents (
140 id UUID PRIMARY KEY,
141 name TEXT NOT NULL,
142 character JSONB NOT NULL,
143 created_at BIGINT NOT NULL DEFAULT extract(epoch from now())::bigint,
144 updated_at BIGINT
145 )
146 "#,
147 )
148 .execute(&self.pool)
149 .await?;
150
151 sqlx::query(
153 r#"
154 CREATE TABLE IF NOT EXISTS entities (
155 id UUID PRIMARY KEY,
156 agent_id UUID NOT NULL,
157 name TEXT,
158 username TEXT,
159 email TEXT,
160 avatar_url TEXT,
161 metadata JSONB DEFAULT '{}',
162 created_at BIGINT NOT NULL DEFAULT extract(epoch from now())::bigint
163 )
164 "#,
165 )
166 .execute(&self.pool)
167 .await?;
168
169 sqlx::query(
171 r#"
172 CREATE TABLE IF NOT EXISTS worlds (
173 id UUID PRIMARY KEY,
174 name TEXT NOT NULL,
175 agent_id UUID NOT NULL,
176 server_id TEXT,
177 metadata JSONB DEFAULT '{}',
178 created_at BIGINT NOT NULL DEFAULT extract(epoch from now())::bigint
179 )
180 "#,
181 )
182 .execute(&self.pool)
183 .await?;
184
185 sqlx::query(
187 r#"
188 CREATE TABLE IF NOT EXISTS rooms (
189 id UUID PRIMARY KEY,
190 agent_id UUID,
191 name TEXT NOT NULL,
192 source TEXT NOT NULL,
193 type TEXT NOT NULL,
194 channel_id TEXT,
195 server_id TEXT,
196 world_id UUID NOT NULL,
197 metadata JSONB DEFAULT '{}',
198 created_at BIGINT NOT NULL DEFAULT extract(epoch from now())::bigint
199 )
200 "#,
201 )
202 .execute(&self.pool)
203 .await?;
204
205 sqlx::query(
207 r#"
208 CREATE TABLE IF NOT EXISTS memories (
209 id UUID PRIMARY KEY,
210 entity_id UUID NOT NULL,
211 agent_id UUID NOT NULL,
212 room_id UUID NOT NULL,
213 content JSONB NOT NULL,
214 embedding vector,
215 metadata JSONB,
216 created_at BIGINT NOT NULL DEFAULT extract(epoch from now())::bigint,
217 unique_flag BOOLEAN DEFAULT FALSE
218 )
219 "#,
220 )
221 .execute(&self.pool)
222 .await?;
223
224 sqlx::query(
226 r#"
227 CREATE TABLE IF NOT EXISTS participants (
228 entity_id UUID NOT NULL,
229 room_id UUID NOT NULL,
230 joined_at BIGINT NOT NULL DEFAULT extract(epoch from now())::bigint,
231 metadata JSONB DEFAULT '{}',
232 PRIMARY KEY (entity_id, room_id)
233 )
234 "#,
235 )
236 .execute(&self.pool)
237 .await?;
238
239 sqlx::query(
242 r#"
243 CREATE TABLE IF NOT EXISTS relationships (
244 id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
245 entity_id_a UUID NOT NULL,
246 entity_id_b UUID NOT NULL,
247 type TEXT NOT NULL,
248 agent_id UUID NOT NULL,
249 metadata JSONB DEFAULT '{}',
250 created_at BIGINT NOT NULL DEFAULT extract(epoch from now())::bigint,
251 UNIQUE (entity_id_a, entity_id_b, type)
252 )
253 "#,
254 )
255 .execute(&self.pool)
256 .await?;
257
258 sqlx::query(
260 r#"
261 CREATE TABLE IF NOT EXISTS components (
262 id UUID PRIMARY KEY,
263 entity_id UUID NOT NULL,
264 world_id UUID NOT NULL,
265 source_entity_id UUID,
266 type TEXT NOT NULL,
267 data JSONB NOT NULL,
268 created_at BIGINT NOT NULL DEFAULT extract(epoch from now())::bigint,
269 updated_at BIGINT
270 )
271 "#,
272 )
273 .execute(&self.pool)
274 .await?;
275
276 sqlx::query(
278 r#"
279 CREATE TABLE IF NOT EXISTS tasks (
280 id UUID PRIMARY KEY,
281 agent_id UUID NOT NULL,
282 task_type TEXT NOT NULL,
283 data JSONB NOT NULL,
284 status TEXT NOT NULL DEFAULT 'PENDING',
285 priority INTEGER DEFAULT 0,
286 scheduled_at BIGINT,
287 executed_at BIGINT,
288 retry_count INTEGER DEFAULT 0,
289 max_retries INTEGER DEFAULT 3,
290 error TEXT,
291 created_at BIGINT NOT NULL DEFAULT extract(epoch from now())::bigint,
292 updated_at BIGINT
293 )
294 "#,
295 )
296 .execute(&self.pool)
297 .await?;
298
299 sqlx::query(
301 r#"
302 CREATE TABLE IF NOT EXISTS logs (
303 id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
304 entity_id UUID NOT NULL,
305 room_id UUID,
306 body JSONB NOT NULL,
307 type TEXT NOT NULL,
308 created_at BIGINT NOT NULL DEFAULT extract(epoch from now())::bigint
309 )
310 "#,
311 )
312 .execute(&self.pool)
313 .await?;
314
315 sqlx::query(
321 r#"
322 DO $$
323 BEGIN
324 -- entities -> agents
325 IF NOT EXISTS (SELECT 1 FROM pg_constraint WHERE conname = 'fk_entities_agent') THEN
326 ALTER TABLE entities ADD CONSTRAINT fk_entities_agent
327 FOREIGN KEY (agent_id) REFERENCES agents(id) ON DELETE CASCADE;
328 END IF;
329
330 -- worlds -> agents
331 IF NOT EXISTS (SELECT 1 FROM pg_constraint WHERE conname = 'fk_worlds_agent') THEN
332 ALTER TABLE worlds ADD CONSTRAINT fk_worlds_agent
333 FOREIGN KEY (agent_id) REFERENCES agents(id) ON DELETE CASCADE;
334 END IF;
335
336 -- rooms -> worlds
337 IF NOT EXISTS (SELECT 1 FROM pg_constraint WHERE conname = 'fk_rooms_world') THEN
338 ALTER TABLE rooms ADD CONSTRAINT fk_rooms_world
339 FOREIGN KEY (world_id) REFERENCES worlds(id) ON DELETE CASCADE;
340 END IF;
341
342 -- rooms -> agents (optional)
343 IF NOT EXISTS (SELECT 1 FROM pg_constraint WHERE conname = 'fk_rooms_agent') THEN
344 ALTER TABLE rooms ADD CONSTRAINT fk_rooms_agent
345 FOREIGN KEY (agent_id) REFERENCES agents(id) ON DELETE SET NULL;
346 END IF;
347
348 -- memories -> entities
349 IF NOT EXISTS (SELECT 1 FROM pg_constraint WHERE conname = 'fk_memories_entity') THEN
350 ALTER TABLE memories ADD CONSTRAINT fk_memories_entity
351 FOREIGN KEY (entity_id) REFERENCES entities(id) ON DELETE CASCADE;
352 END IF;
353
354 -- memories -> agents
355 IF NOT EXISTS (SELECT 1 FROM pg_constraint WHERE conname = 'fk_memories_agent') THEN
356 ALTER TABLE memories ADD CONSTRAINT fk_memories_agent
357 FOREIGN KEY (agent_id) REFERENCES agents(id) ON DELETE CASCADE;
358 END IF;
359
360 -- memories -> rooms
361 IF NOT EXISTS (SELECT 1 FROM pg_constraint WHERE conname = 'fk_memories_room') THEN
362 ALTER TABLE memories ADD CONSTRAINT fk_memories_room
363 FOREIGN KEY (room_id) REFERENCES rooms(id) ON DELETE CASCADE;
364 END IF;
365
366 -- participants -> entities
367 IF NOT EXISTS (SELECT 1 FROM pg_constraint WHERE conname = 'fk_participants_entity') THEN
368 ALTER TABLE participants ADD CONSTRAINT fk_participants_entity
369 FOREIGN KEY (entity_id) REFERENCES entities(id) ON DELETE CASCADE;
370 END IF;
371
372 -- participants -> rooms
373 IF NOT EXISTS (SELECT 1 FROM pg_constraint WHERE conname = 'fk_participants_room') THEN
374 ALTER TABLE participants ADD CONSTRAINT fk_participants_room
375 FOREIGN KEY (room_id) REFERENCES rooms(id) ON DELETE CASCADE;
376 END IF;
377
378 -- relationships -> entities (both sides)
379 IF NOT EXISTS (SELECT 1 FROM pg_constraint WHERE conname = 'fk_relationships_entity_a') THEN
380 ALTER TABLE relationships ADD CONSTRAINT fk_relationships_entity_a
381 FOREIGN KEY (entity_id_a) REFERENCES entities(id) ON DELETE CASCADE;
382 END IF;
383
384 IF NOT EXISTS (SELECT 1 FROM pg_constraint WHERE conname = 'fk_relationships_entity_b') THEN
385 ALTER TABLE relationships ADD CONSTRAINT fk_relationships_entity_b
386 FOREIGN KEY (entity_id_b) REFERENCES entities(id) ON DELETE CASCADE;
387 END IF;
388
389 -- relationships -> agents
390 IF NOT EXISTS (SELECT 1 FROM pg_constraint WHERE conname = 'fk_relationships_agent') THEN
391 ALTER TABLE relationships ADD CONSTRAINT fk_relationships_agent
392 FOREIGN KEY (agent_id) REFERENCES agents(id) ON DELETE CASCADE;
393 END IF;
394
395 -- components -> entities
396 IF NOT EXISTS (SELECT 1 FROM pg_constraint WHERE conname = 'fk_components_entity') THEN
397 ALTER TABLE components ADD CONSTRAINT fk_components_entity
398 FOREIGN KEY (entity_id) REFERENCES entities(id) ON DELETE CASCADE;
399 END IF;
400
401 -- components -> worlds
402 IF NOT EXISTS (SELECT 1 FROM pg_constraint WHERE conname = 'fk_components_world') THEN
403 ALTER TABLE components ADD CONSTRAINT fk_components_world
404 FOREIGN KEY (world_id) REFERENCES worlds(id) ON DELETE CASCADE;
405 END IF;
406
407 -- components -> entities (source)
408 IF NOT EXISTS (SELECT 1 FROM pg_constraint WHERE conname = 'fk_components_source_entity') THEN
409 ALTER TABLE components ADD CONSTRAINT fk_components_source_entity
410 FOREIGN KEY (source_entity_id) REFERENCES entities(id) ON DELETE SET NULL;
411 END IF;
412
413 -- tasks -> agents
414 IF NOT EXISTS (SELECT 1 FROM pg_constraint WHERE conname = 'fk_tasks_agent') THEN
415 ALTER TABLE tasks ADD CONSTRAINT fk_tasks_agent
416 FOREIGN KEY (agent_id) REFERENCES agents(id) ON DELETE CASCADE;
417 END IF;
418
419 -- logs -> entities
420 IF NOT EXISTS (SELECT 1 FROM pg_constraint WHERE conname = 'fk_logs_entity') THEN
421 ALTER TABLE logs ADD CONSTRAINT fk_logs_entity
422 FOREIGN KEY (entity_id) REFERENCES entities(id) ON DELETE CASCADE;
423 END IF;
424
425 -- logs -> rooms (optional)
426 IF NOT EXISTS (SELECT 1 FROM pg_constraint WHERE conname = 'fk_logs_room') THEN
427 ALTER TABLE logs ADD CONSTRAINT fk_logs_room
428 FOREIGN KEY (room_id) REFERENCES rooms(id) ON DELETE SET NULL;
429 END IF;
430 END $$;
431 "#,
432 )
433 .execute(&self.pool)
434 .await?;
435
436 sqlx::query(
442 r#"
443 DO $$
444 BEGIN
445 IF NOT EXISTS (SELECT 1 FROM pg_constraint WHERE conname = 'uq_components_entity_world_type_source') THEN
446 ALTER TABLE components ADD CONSTRAINT uq_components_entity_world_type_source
447 UNIQUE (entity_id, world_id, type, source_entity_id);
448 END IF;
449 END $$;
450 "#,
451 )
452 .execute(&self.pool)
453 .await?;
454
455 sqlx::query("CREATE INDEX IF NOT EXISTS idx_entities_agent_id ON entities(agent_id)")
461 .execute(&self.pool)
462 .await?;
463
464 sqlx::query("CREATE INDEX IF NOT EXISTS idx_entities_username ON entities(username) WHERE username IS NOT NULL")
465 .execute(&self.pool)
466 .await?;
467
468 sqlx::query("CREATE INDEX IF NOT EXISTS idx_worlds_agent_id ON worlds(agent_id)")
470 .execute(&self.pool)
471 .await?;
472
473 sqlx::query("CREATE INDEX IF NOT EXISTS idx_worlds_server_id ON worlds(server_id) WHERE server_id IS NOT NULL")
474 .execute(&self.pool)
475 .await?;
476
477 sqlx::query("CREATE INDEX IF NOT EXISTS idx_rooms_world_id ON rooms(world_id)")
479 .execute(&self.pool)
480 .await?;
481
482 sqlx::query("CREATE INDEX IF NOT EXISTS idx_rooms_agent_id ON rooms(agent_id) WHERE agent_id IS NOT NULL")
483 .execute(&self.pool)
484 .await?;
485
486 sqlx::query("CREATE INDEX IF NOT EXISTS idx_rooms_channel_id ON rooms(channel_id) WHERE channel_id IS NOT NULL")
487 .execute(&self.pool)
488 .await?;
489
490 sqlx::query("CREATE INDEX IF NOT EXISTS idx_rooms_source_server ON rooms(source, server_id) WHERE server_id IS NOT NULL")
491 .execute(&self.pool)
492 .await?;
493
494 sqlx::query("CREATE INDEX IF NOT EXISTS idx_rooms_type ON rooms(type)")
495 .execute(&self.pool)
496 .await?;
497
498 sqlx::query("CREATE INDEX IF NOT EXISTS idx_memories_agent_id ON memories(agent_id)")
500 .execute(&self.pool)
501 .await?;
502
503 sqlx::query("CREATE INDEX IF NOT EXISTS idx_memories_room_id ON memories(room_id)")
504 .execute(&self.pool)
505 .await?;
506
507 sqlx::query("CREATE INDEX IF NOT EXISTS idx_memories_entity_id ON memories(entity_id)")
508 .execute(&self.pool)
509 .await?;
510
511 sqlx::query(
512 "CREATE INDEX IF NOT EXISTS idx_memories_created_at ON memories(created_at DESC)",
513 )
514 .execute(&self.pool)
515 .await?;
516
517 sqlx::query("CREATE INDEX IF NOT EXISTS idx_memories_agent_room_created ON memories(agent_id, room_id, created_at DESC)")
518 .execute(&self.pool)
519 .await?;
520
521 sqlx::query("CREATE INDEX IF NOT EXISTS idx_memories_unique ON memories(agent_id, room_id) WHERE unique_flag = TRUE")
522 .execute(&self.pool)
523 .await?;
524
525 sqlx::query(
527 "CREATE INDEX IF NOT EXISTS idx_participants_entity_id ON participants(entity_id)",
528 )
529 .execute(&self.pool)
530 .await?;
531
532 sqlx::query("CREATE INDEX IF NOT EXISTS idx_participants_room_id ON participants(room_id)")
533 .execute(&self.pool)
534 .await?;
535
536 sqlx::query(
538 "CREATE INDEX IF NOT EXISTS idx_relationships_entity_a ON relationships(entity_id_a)",
539 )
540 .execute(&self.pool)
541 .await?;
542
543 sqlx::query(
544 "CREATE INDEX IF NOT EXISTS idx_relationships_entity_b ON relationships(entity_id_b)",
545 )
546 .execute(&self.pool)
547 .await?;
548
549 sqlx::query(
550 "CREATE INDEX IF NOT EXISTS idx_relationships_agent_id ON relationships(agent_id)",
551 )
552 .execute(&self.pool)
553 .await?;
554
555 sqlx::query("CREATE INDEX IF NOT EXISTS idx_relationships_type ON relationships(type)")
556 .execute(&self.pool)
557 .await?;
558
559 sqlx::query("CREATE INDEX IF NOT EXISTS idx_components_entity_id ON components(entity_id)")
561 .execute(&self.pool)
562 .await?;
563
564 sqlx::query("CREATE INDEX IF NOT EXISTS idx_components_world_id ON components(world_id)")
565 .execute(&self.pool)
566 .await?;
567
568 sqlx::query("CREATE INDEX IF NOT EXISTS idx_components_type ON components(type)")
569 .execute(&self.pool)
570 .await?;
571
572 sqlx::query(
573 "CREATE INDEX IF NOT EXISTS idx_components_entity_type ON components(entity_id, type)",
574 )
575 .execute(&self.pool)
576 .await?;
577
578 sqlx::query("CREATE INDEX IF NOT EXISTS idx_tasks_agent_id ON tasks(agent_id)")
580 .execute(&self.pool)
581 .await?;
582
583 sqlx::query("CREATE INDEX IF NOT EXISTS idx_tasks_status ON tasks(status)")
584 .execute(&self.pool)
585 .await?;
586
587 sqlx::query("CREATE INDEX IF NOT EXISTS idx_tasks_status_scheduled ON tasks(status, scheduled_at) WHERE status = 'PENDING'")
588 .execute(&self.pool)
589 .await?;
590
591 sqlx::query("CREATE INDEX IF NOT EXISTS idx_tasks_agent_status ON tasks(agent_id, status)")
592 .execute(&self.pool)
593 .await?;
594
595 sqlx::query("CREATE INDEX IF NOT EXISTS idx_logs_entity_id ON logs(entity_id)")
597 .execute(&self.pool)
598 .await?;
599
600 sqlx::query("CREATE INDEX IF NOT EXISTS idx_logs_room_id ON logs(room_id) WHERE room_id IS NOT NULL")
601 .execute(&self.pool)
602 .await?;
603
604 sqlx::query("CREATE INDEX IF NOT EXISTS idx_logs_type ON logs(type)")
605 .execute(&self.pool)
606 .await?;
607
608 sqlx::query("CREATE INDEX IF NOT EXISTS idx_logs_created_at ON logs(created_at DESC)")
609 .execute(&self.pool)
610 .await?;
611
612 sqlx::query("CREATE INDEX IF NOT EXISTS idx_logs_entity_created ON logs(entity_id, created_at DESC)")
613 .execute(&self.pool)
614 .await?;
615
616 sqlx::query(
621 r#"
622 CREATE TABLE IF NOT EXISTS llm_costs (
623 id UUID PRIMARY KEY,
624 timestamp TIMESTAMPTZ NOT NULL,
625
626 agent_id UUID NOT NULL,
627 user_id TEXT,
628 conversation_id UUID,
629 action_name TEXT,
630 evaluator_name TEXT,
631
632 provider TEXT NOT NULL,
633 model TEXT NOT NULL,
634 temperature REAL NOT NULL,
635
636 prompt_tokens INTEGER NOT NULL,
637 completion_tokens INTEGER NOT NULL,
638 total_tokens INTEGER NOT NULL,
639 cached_tokens INTEGER,
640
641 input_cost_usd REAL NOT NULL,
642 output_cost_usd REAL NOT NULL,
643 total_cost_usd REAL NOT NULL,
644
645 latency_ms BIGINT NOT NULL,
646 ttft_ms BIGINT,
647
648 success BOOLEAN NOT NULL,
649 error TEXT,
650
651 prompt_hash TEXT,
652 prompt_preview TEXT
653 )
654 "#,
655 )
656 .execute(&self.pool)
657 .await?;
658
659 sqlx::query(
660 r#"
661 CREATE TABLE IF NOT EXISTS stored_prompts (
662 id UUID PRIMARY KEY,
663 timestamp TIMESTAMPTZ NOT NULL,
664 cost_record_id UUID REFERENCES llm_costs(id) ON DELETE CASCADE,
665 agent_id UUID NOT NULL,
666 conversation_id UUID,
667 prompt_hash TEXT NOT NULL,
668 prompt_text TEXT,
669 prompt_length INTEGER NOT NULL,
670 sanitized BOOLEAN NOT NULL,
671 sanitization_level TEXT NOT NULL,
672 completion_text TEXT,
673 completion_length INTEGER NOT NULL DEFAULT 0,
674 model TEXT NOT NULL,
675 temperature REAL NOT NULL
676 )
677 "#,
678 )
679 .execute(&self.pool)
680 .await?;
681
682 sqlx::query("CREATE INDEX IF NOT EXISTS idx_llm_costs_agent_timestamp ON llm_costs(agent_id, timestamp DESC)")
684 .execute(&self.pool)
685 .await?;
686
687 sqlx::query(
688 "CREATE INDEX IF NOT EXISTS idx_llm_costs_provider_model ON llm_costs(provider, model)",
689 )
690 .execute(&self.pool)
691 .await?;
692
693 sqlx::query(
694 "CREATE INDEX IF NOT EXISTS idx_llm_costs_timestamp ON llm_costs(timestamp DESC)",
695 )
696 .execute(&self.pool)
697 .await?;
698
699 sqlx::query("CREATE INDEX IF NOT EXISTS idx_llm_costs_conversation ON llm_costs(conversation_id) WHERE conversation_id IS NOT NULL")
700 .execute(&self.pool)
701 .await?;
702
703 sqlx::query("CREATE INDEX IF NOT EXISTS idx_stored_prompts_agent_timestamp ON stored_prompts(agent_id, timestamp DESC)")
704 .execute(&self.pool)
705 .await?;
706
707 sqlx::query(
708 "CREATE INDEX IF NOT EXISTS idx_stored_prompts_hash ON stored_prompts(prompt_hash)",
709 )
710 .execute(&self.pool)
711 .await?;
712
713 sqlx::query("CREATE INDEX IF NOT EXISTS idx_stored_prompts_cost_record ON stored_prompts(cost_record_id)")
714 .execute(&self.pool)
715 .await?;
716
717 info!("Database schema initialized successfully");
718 Ok(())
719 }
720}
721
722#[async_trait]
723impl IDatabaseAdapter for PostgresAdapter {
724 fn db(&self) -> &dyn std::any::Any {
725 &self.pool
726 }
727
728 async fn initialize(&mut self, _config: Option<serde_json::Value>) -> Result<()> {
729 self.init_schema().await
730 }
731
732 async fn is_ready(&self) -> Result<bool> {
733 match sqlx::query("SELECT 1").fetch_one(&self.pool).await {
735 Ok(_) => Ok(true),
736 Err(_) => Ok(false),
737 }
738 }
739
740 async fn close(&mut self) -> Result<()> {
741 self.pool.close().await;
742 Ok(())
743 }
744
745 async fn get_connection(&self) -> Result<Box<dyn std::any::Any + Send>> {
746 Ok(Box::new(self.pool.clone()))
747 }
748
749 async fn run_plugin_migrations(
750 &self,
751 plugins: Vec<PluginMigration>,
752 options: MigrationOptions,
753 ) -> Result<()> {
754 if plugins.is_empty() {
755 return Ok(());
756 }
757
758 for plugin in plugins {
759 if let Some(schema) = plugin.schema {
760 if options.verbose {
761 info!("Applying schema for plugin '{}'", plugin.name);
762 }
763
764 let schema_obj = schema.as_object().ok_or_else(|| {
766 ZoeyError::validation(format!(
767 "Invalid schema for plugin '{}': expected JSON object",
768 plugin.name
769 ))
770 })?;
771
772 use std::collections::{HashMap, HashSet, VecDeque};
774 let mut deps: HashMap<String, HashSet<String>> = HashMap::new();
775 let mut reverse_deps: HashMap<String, HashSet<String>> = HashMap::new();
776 let table_names: Vec<String> = schema_obj.keys().map(|k| k.to_string()).collect();
777
778 for (tname, tdef) in schema_obj.iter() {
779 let tname = validate_identifier(tname)?;
780 let columns_obj =
781 tdef.get("columns")
782 .and_then(|v| v.as_object())
783 .ok_or_else(|| {
784 ZoeyError::validation(format!(
785 "Invalid table definition for '{}': missing 'columns' object",
786 tname
787 ))
788 })?;
789 for (_col_name, col_type_val) in columns_obj.iter() {
790 if let Some(col_type) = col_type_val.as_str() {
791 if let Some(idx) = col_type.find("REFERENCES") {
792 let tail = &col_type[idx + "REFERENCES".len()..];
793 let ref_name = tail
794 .trim()
795 .split(|c: char| c.is_whitespace() || c == '(')
796 .filter(|s| !s.is_empty())
797 .next()
798 .unwrap_or("");
799 if !ref_name.is_empty() {
800 let ref_name = validate_identifier(ref_name)?;
801 if table_names.iter().any(|n| n == ref_name) {
802 deps.entry(tname.to_string())
803 .or_default()
804 .insert(ref_name.to_string());
805 reverse_deps
806 .entry(ref_name.to_string())
807 .or_default()
808 .insert(tname.to_string());
809 }
810 }
811 }
812 }
813 }
814 }
815
816 let mut indegree: HashMap<String, usize> = HashMap::new();
818 for t in &table_names {
819 let d = deps.get(t).map(|s| s.len()).unwrap_or(0);
820 indegree.insert(t.clone(), d);
821 }
822 let mut q: VecDeque<String> = table_names
823 .iter()
824 .filter(|t| indegree.get(*t).copied().unwrap_or(0) == 0)
825 .cloned()
826 .collect();
827 let mut order: Vec<String> = Vec::new();
828 while let Some(n) = q.pop_front() {
829 order.push(n.clone());
830 if let Some(children) = reverse_deps.get(&n) {
831 for c in children {
832 if let Some(e) = indegree.get_mut(c) {
833 if *e > 0 {
834 *e -= 1;
835 if *e == 0 {
836 q.push_back(c.clone());
837 }
838 }
839 }
840 }
841 }
842 }
843 for t in &table_names {
845 if !order.contains(t) {
846 order.push(t.clone());
847 }
848 }
849
850 for table_name in order {
852 let table_def = &schema_obj[&table_name];
853 let columns_obj = table_def
854 .get("columns")
855 .and_then(|v| v.as_object())
856 .ok_or_else(|| {
857 ZoeyError::validation(format!(
858 "Invalid table definition for '{}': missing 'columns' object",
859 table_name
860 ))
861 })?;
862
863 let mut cols_sql: Vec<String> = Vec::new();
864 for (col_name, col_type_val) in columns_obj.iter() {
865 let col_name = validate_identifier(col_name)?;
866 let col_type = col_type_val.as_str().ok_or_else(|| {
867 ZoeyError::validation(format!(
868 "Invalid type for column '{}.{}'",
869 table_name, col_name
870 ))
871 })?;
872
873 let allowed_chars = |c: char| {
874 c.is_ascii_alphanumeric()
875 || matches!(c, ' ' | '_' | '(' | ')' | ',' | '-' | ':')
876 };
877 if col_type.len() > 128 || !col_type.chars().all(allowed_chars) {
878 return Err(ZoeyError::validation(format!(
879 "Unsupported or unsafe type for '{}.{}': {}",
880 table_name, col_name, col_type
881 )));
882 }
883 cols_sql.push(format!("{} {}", col_name, col_type));
884 }
885
886 let create_sql = format!(
887 "CREATE TABLE IF NOT EXISTS {} ({})",
888 table_name,
889 cols_sql.join(", ")
890 );
891
892 if options.verbose {
893 debug!("{}", create_sql);
894 }
895
896 if !options.dry_run {
897 sqlx::query(&create_sql).execute(&self.pool).await?;
898 }
899 }
900
901 if options.verbose {
902 info!("✓ Schema applied for plugin '{}'", plugin.name);
903 }
904 }
905 }
906 Ok(())
907 }
908
909 async fn get_agent(&self, agent_id: UUID) -> Result<Option<Agent>> {
911 let row = sqlx::query(
912 "SELECT id, name, character, created_at, updated_at FROM agents WHERE id = $1",
913 )
914 .bind(agent_id)
915 .fetch_optional(&self.pool)
916 .await?;
917
918 match row {
919 Some(row) => Ok(Some(Agent {
920 id: row.get("id"),
921 name: row.get("name"),
922 character: row.get("character"),
923 created_at: row.get("created_at"),
924 updated_at: row.get("updated_at"),
925 })),
926 None => Ok(None),
927 }
928 }
929
930 async fn get_agents(&self) -> Result<Vec<Agent>> {
931 let rows = sqlx::query("SELECT id, name, character, created_at, updated_at FROM agents")
932 .fetch_all(&self.pool)
933 .await?;
934
935 Ok(rows
936 .into_iter()
937 .map(|row| Agent {
938 id: row.get("id"),
939 name: row.get("name"),
940 character: row.get("character"),
941 created_at: row.get("created_at"),
942 updated_at: row.get("updated_at"),
943 })
944 .collect())
945 }
946
947 async fn create_agent(&self, agent: &Agent) -> Result<bool> {
948 let result = sqlx::query(
949 "INSERT INTO agents (id, name, character, created_at, updated_at)
950 VALUES ($1, $2, $3, $4, $5)",
951 )
952 .bind(&agent.id)
953 .bind(&agent.name)
954 .bind(&agent.character)
955 .bind(&agent.created_at)
956 .bind(&agent.updated_at)
957 .execute(&self.pool)
958 .await?;
959
960 Ok(result.rows_affected() > 0)
961 }
962
963 async fn update_agent(&self, agent_id: UUID, agent: &Agent) -> Result<bool> {
964 let result = sqlx::query(
965 "UPDATE agents SET name = $2, character = $3, updated_at = $4 WHERE id = $1",
966 )
967 .bind(agent_id)
968 .bind(&agent.name)
969 .bind(&agent.character)
970 .bind(chrono::Utc::now().timestamp())
971 .execute(&self.pool)
972 .await?;
973
974 Ok(result.rows_affected() > 0)
975 }
976
977 async fn delete_agent(&self, agent_id: UUID) -> Result<bool> {
978 let result = sqlx::query("DELETE FROM agents WHERE id = $1")
979 .bind(agent_id)
980 .execute(&self.pool)
981 .await?;
982
983 Ok(result.rows_affected() > 0)
984 }
985
986 async fn ensure_embedding_dimension(&self, dimension: usize) -> Result<()> {
987 sqlx::query("CREATE EXTENSION IF NOT EXISTS vector")
989 .execute(&self.pool)
990 .await?;
991
992 *self.embedding_dimension.write().unwrap() = dimension;
993 Ok(())
994 }
995
996 async fn get_entities_by_ids(&self, entity_ids: Vec<UUID>) -> Result<Vec<Entity>> {
997 if entity_ids.is_empty() {
998 return Ok(vec![]);
999 }
1000 let rows = sqlx::query(
1001 "SELECT id, agent_id, name, username, email, avatar_url, metadata, created_at FROM entities WHERE id = ANY($1)"
1002 )
1003 .bind(entity_ids)
1004 .fetch_all(&self.pool)
1005 .await?;
1006
1007 Ok(rows
1008 .into_iter()
1009 .map(|row| Entity {
1010 id: row.get("id"),
1011 agent_id: row.get("agent_id"),
1012 name: row.get("name"),
1013 username: row.get("username"),
1014 email: row.get("email"),
1015 avatar_url: row.get("avatar_url"),
1016 metadata: row
1017 .try_get("metadata")
1018 .ok()
1019 .and_then(|v: serde_json::Value| serde_json::from_value(v).ok())
1020 .unwrap_or_default(),
1021 created_at: row.get("created_at"),
1022 })
1023 .collect())
1024 }
1025
1026 async fn get_entities_for_room(
1027 &self,
1028 room_id: UUID,
1029 _include_components: bool,
1030 ) -> Result<Vec<Entity>> {
1031 let rows = sqlx::query(
1032 "SELECT e.id, e.agent_id, e.name, e.username, e.email, e.avatar_url, e.metadata, e.created_at
1033 FROM participants p JOIN entities e ON e.id = p.entity_id WHERE p.room_id = $1"
1034 )
1035 .bind(room_id)
1036 .fetch_all(&self.pool)
1037 .await?;
1038
1039 Ok(rows
1040 .into_iter()
1041 .map(|row| Entity {
1042 id: row.get("id"),
1043 agent_id: row.get("agent_id"),
1044 name: row.get("name"),
1045 username: row.get("username"),
1046 email: row.get("email"),
1047 avatar_url: row.get("avatar_url"),
1048 metadata: row
1049 .try_get("metadata")
1050 .ok()
1051 .and_then(|v: serde_json::Value| serde_json::from_value(v).ok())
1052 .unwrap_or_default(),
1053 created_at: row.get("created_at"),
1054 })
1055 .collect())
1056 }
1057
1058 async fn create_entities(&self, entities: Vec<Entity>) -> Result<bool> {
1059 for entity in entities {
1060 sqlx::query(
1061 "INSERT INTO entities (id, agent_id, name, username, email, avatar_url, metadata, created_at)
1062 VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
1063 ON CONFLICT (id) DO UPDATE SET
1064 name = EXCLUDED.name,
1065 username = EXCLUDED.username,
1066 email = EXCLUDED.email,
1067 avatar_url = EXCLUDED.avatar_url,
1068 metadata = EXCLUDED.metadata"
1069 )
1070 .bind(entity.id)
1071 .bind(entity.agent_id)
1072 .bind(&entity.name)
1073 .bind(&entity.username)
1074 .bind(&entity.email)
1075 .bind(&entity.avatar_url)
1076 .bind(serde_json::to_value(&entity.metadata)?)
1077 .bind(entity.created_at)
1078 .execute(&self.pool)
1079 .await?;
1080 }
1081 Ok(true)
1082 }
1083
1084 async fn update_entity(&self, entity: &Entity) -> Result<()> {
1085 sqlx::query(
1086 "UPDATE entities SET name = $2, username = $3, email = $4, avatar_url = $5, metadata = $6, created_at = $7 WHERE id = $1"
1087 )
1088 .bind(entity.id)
1089 .bind(&entity.name)
1090 .bind(&entity.username)
1091 .bind(&entity.email)
1092 .bind(&entity.avatar_url)
1093 .bind(serde_json::to_value(&entity.metadata)?)
1094 .bind(entity.created_at)
1095 .execute(&self.pool)
1096 .await?;
1097 Ok(())
1098 }
1099
1100 async fn get_entity_by_id(&self, entity_id: UUID) -> Result<Option<Entity>> {
1101 let row = sqlx::query(
1102 "SELECT id, agent_id, name, username, email, avatar_url, metadata, created_at
1103 FROM entities WHERE id = $1",
1104 )
1105 .bind(entity_id)
1106 .fetch_optional(&self.pool)
1107 .await?;
1108
1109 match row {
1110 Some(row) => Ok(Some(Entity {
1111 id: row.get("id"),
1112 agent_id: row.get("agent_id"),
1113 name: row.get("name"),
1114 username: row.get("username"),
1115 email: row.get("email"),
1116 avatar_url: row.get("avatar_url"),
1117 metadata: row
1118 .try_get("metadata")
1119 .ok()
1120 .and_then(|v: serde_json::Value| serde_json::from_value(v).ok())
1121 .unwrap_or_default(),
1122 created_at: row.get("created_at"),
1123 })),
1124 None => Ok(None),
1125 }
1126 }
1127
1128 async fn get_component(
1129 &self,
1130 entity_id: UUID,
1131 component_type: &str,
1132 world_id: Option<UUID>,
1133 source_entity_id: Option<UUID>,
1134 ) -> Result<Option<Component>> {
1135 let mut query = String::from(
1136 "SELECT id, entity_id, world_id, source_entity_id, type, data, created_at, updated_at FROM components WHERE entity_id = $1 AND type = $2"
1137 );
1138 let mut idx = 3;
1139 if world_id.is_some() {
1140 query.push_str(&format!(" AND world_id = ${}", idx));
1141 idx += 1;
1142 }
1143 if source_entity_id.is_some() {
1144 query.push_str(&format!(" AND source_entity_id = ${}", idx));
1145 }
1146
1147 let mut sql = sqlx::query(&query).bind(entity_id).bind(component_type);
1148 if let Some(wid) = world_id {
1149 sql = sql.bind(wid);
1150 }
1151 if let Some(seid) = source_entity_id {
1152 sql = sql.bind(seid);
1153 }
1154
1155 let row = sql.fetch_optional(&self.pool).await?;
1156 Ok(row.map(|row| Component {
1157 id: row.get("id"),
1158 entity_id: row.get("entity_id"),
1159 world_id: row.get("world_id"),
1160 source_entity_id: row.get("source_entity_id"),
1161 component_type: row.get("type"),
1162 data: row.get("data"),
1163 created_at: row.get("created_at"),
1164 updated_at: row.get("updated_at"),
1165 }))
1166 }
1167
1168 async fn get_components(
1169 &self,
1170 entity_id: UUID,
1171 world_id: Option<UUID>,
1172 source_entity_id: Option<UUID>,
1173 ) -> Result<Vec<Component>> {
1174 let mut query = String::from(
1175 "SELECT id, entity_id, world_id, source_entity_id, type, data, created_at, updated_at FROM components WHERE entity_id = $1"
1176 );
1177 let mut idx = 2;
1178 if world_id.is_some() {
1179 query.push_str(&format!(" AND world_id = ${}", idx));
1180 idx += 1;
1181 }
1182 if source_entity_id.is_some() {
1183 query.push_str(&format!(" AND source_entity_id = ${}", idx));
1184 }
1185 let mut sql = sqlx::query(&query).bind(entity_id);
1186 if let Some(wid) = world_id {
1187 sql = sql.bind(wid);
1188 }
1189 if let Some(seid) = source_entity_id {
1190 sql = sql.bind(seid);
1191 }
1192 let rows = sql.fetch_all(&self.pool).await?;
1193 Ok(rows
1194 .into_iter()
1195 .map(|row| Component {
1196 id: row.get("id"),
1197 entity_id: row.get("entity_id"),
1198 world_id: row.get("world_id"),
1199 source_entity_id: row.get("source_entity_id"),
1200 component_type: row.get("type"),
1201 data: row.get("data"),
1202 created_at: row.get("created_at"),
1203 updated_at: row.get("updated_at"),
1204 })
1205 .collect())
1206 }
1207
1208 async fn create_component(&self, component: &Component) -> Result<bool> {
1209 let result = sqlx::query(
1210 "INSERT INTO components (id, entity_id, world_id, source_entity_id, type, data, created_at, updated_at)
1211 VALUES ($1, $2, $3, $4, $5, $6, $7, $8)"
1212 )
1213 .bind(component.id)
1214 .bind(component.entity_id)
1215 .bind(component.world_id)
1216 .bind(component.source_entity_id)
1217 .bind(&component.component_type)
1218 .bind(&component.data)
1219 .bind(component.created_at)
1220 .bind(component.updated_at)
1221 .execute(&self.pool)
1222 .await?;
1223 Ok(result.rows_affected() > 0)
1224 }
1225
1226 async fn update_component(&self, component: &Component) -> Result<()> {
1227 sqlx::query("UPDATE components SET data = $2, updated_at = $3 WHERE id = $1")
1228 .bind(component.id)
1229 .bind(&component.data)
1230 .bind(chrono::Utc::now().timestamp())
1231 .execute(&self.pool)
1232 .await?;
1233 Ok(())
1234 }
1235
1236 async fn delete_component(&self, component_id: UUID) -> Result<()> {
1237 sqlx::query("DELETE FROM components WHERE id = $1")
1238 .bind(component_id)
1239 .execute(&self.pool)
1240 .await?;
1241 Ok(())
1242 }
1243
1244 async fn get_memories(&self, params: MemoryQuery) -> Result<Vec<Memory>> {
1245 let mut query = String::from("SELECT id, entity_id, agent_id, room_id, content, embedding, metadata, created_at, unique_flag FROM memories WHERE 1=1");
1246 let mut param_index = 1;
1247
1248 if let Some(agent_id) = params.agent_id {
1249 query.push_str(&format!(" AND agent_id = ${}", param_index));
1250 param_index += 1;
1251 }
1252 if let Some(room_id) = params.room_id {
1253 query.push_str(&format!(" AND room_id = ${}", param_index));
1254 param_index += 1;
1255 }
1256 if let Some(entity_id) = params.entity_id {
1257 query.push_str(&format!(" AND entity_id = ${}", param_index));
1258 param_index += 1;
1259 }
1260 if let Some(unique) = params.unique {
1261 query.push_str(&format!(" AND unique_flag = ${}", param_index));
1262 param_index += 1;
1263 }
1264
1265 query.push_str(" ORDER BY created_at DESC");
1266
1267 if let Some(count) = params.count {
1268 query.push_str(&format!(" LIMIT ${}", param_index));
1269 param_index += 1;
1270 }
1271
1272 let mut sql_query = sqlx::query(&query);
1273 let mut bind_idx = 1;
1274 if let Some(agent_id) = params.agent_id {
1275 sql_query = sql_query.bind(agent_id);
1276 bind_idx += 1;
1277 }
1278 if let Some(room_id) = params.room_id {
1279 sql_query = sql_query.bind(room_id);
1280 bind_idx += 1;
1281 }
1282 if let Some(entity_id) = params.entity_id {
1283 sql_query = sql_query.bind(entity_id);
1284 bind_idx += 1;
1285 }
1286 if let Some(unique) = params.unique {
1287 sql_query = sql_query.bind(unique);
1288 bind_idx += 1;
1289 }
1290 if let Some(count) = params.count {
1291 sql_query = sql_query.bind(count as i64);
1292 }
1293
1294 let rows = sql_query.fetch_all(&self.pool).await?;
1295
1296 let memories = rows
1297 .into_iter()
1298 .map(|row| Memory {
1299 id: row.get("id"),
1300 entity_id: row.get("entity_id"),
1301 agent_id: row.get("agent_id"),
1302 room_id: row.get("room_id"),
1303 content: serde_json::from_value(row.get("content")).unwrap_or_default(),
1304 embedding: row.get("embedding"),
1305 metadata: row
1306 .try_get("metadata")
1307 .ok()
1308 .and_then(|v: serde_json::Value| serde_json::from_value(v).ok()),
1309 created_at: row.get("created_at"),
1310 unique: Some(row.get("unique_flag")),
1311 similarity: None,
1312 })
1313 .collect();
1314
1315 Ok(memories)
1316 }
1317
1318 async fn create_memory(&self, memory: &Memory, _table_name: &str) -> Result<UUID> {
1319 sqlx::query(
1320 "INSERT INTO memories (id, entity_id, agent_id, room_id, content, embedding, metadata, created_at, unique_flag)
1321 VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)"
1322 )
1323 .bind(memory.id)
1324 .bind(memory.entity_id)
1325 .bind(memory.agent_id)
1326 .bind(memory.room_id)
1327 .bind(serde_json::to_value(&memory.content)?)
1328 .bind(&memory.embedding)
1329 .bind(serde_json::to_value(&memory.metadata)?)
1330 .bind(memory.created_at)
1331 .bind(memory.unique.unwrap_or(false))
1332 .execute(&self.pool)
1333 .await?;
1334
1335 Ok(memory.id)
1336 }
1337
1338 async fn search_memories_by_embedding(
1339 &self,
1340 params: SearchMemoriesParams,
1341 ) -> Result<Vec<Memory>> {
1342 use tracing::{info, warn};
1343
1344 let expected_dim = *self.embedding_dimension.read().unwrap();
1346 let actual_dim = params.embedding.len();
1347
1348 if actual_dim != expected_dim {
1349 return Err(ZoeyError::vector_search(
1350 format!(
1351 "Embedding dimension mismatch for table '{}'",
1352 params.table_name
1353 ),
1354 actual_dim,
1355 expected_dim,
1356 ));
1357 }
1358
1359 let validated_table = validate_table_name(¶ms.table_name)?;
1361
1362 info!(
1363 "Searching memories by embedding in table '{}' with {} dimensions, limit: {}",
1364 validated_table, actual_dim, params.count
1365 );
1366
1367 let mut query = format!(
1369 "SELECT id, entity_id, agent_id, room_id, content, embedding, metadata, created_at, unique_flag,
1370 embedding <-> $1::vector AS similarity
1371 FROM {}
1372 WHERE embedding IS NOT NULL",
1373 validated_table
1374 );
1375
1376 let mut param_count = 2;
1377 if params.agent_id.is_some() {
1378 query.push_str(&format!(" AND agent_id = ${}", param_count));
1379 param_count += 1;
1380 }
1381 if params.room_id.is_some() {
1382 query.push_str(&format!(" AND room_id = ${}", param_count));
1383 param_count += 1;
1384 }
1385 if params.world_id.is_some() {
1386 query.push_str(&format!(" AND world_id = ${}", param_count));
1387 param_count += 1;
1388 }
1389 if params.entity_id.is_some() {
1390 query.push_str(&format!(" AND entity_id = ${}", param_count));
1391 param_count += 1;
1392 }
1393
1394 query.push_str(" ORDER BY similarity ASC");
1395 query.push_str(&format!(" LIMIT ${}", param_count));
1396
1397 let embedding_str = format!(
1399 "[{}]",
1400 params
1401 .embedding
1402 .iter()
1403 .map(|v| v.to_string())
1404 .collect::<Vec<_>>()
1405 .join(",")
1406 );
1407
1408 let mut sql_query = sqlx::query(&query).bind(&embedding_str);
1409
1410 if let Some(agent_id) = params.agent_id {
1411 sql_query = sql_query.bind(agent_id);
1412 }
1413 if let Some(room_id) = params.room_id {
1414 sql_query = sql_query.bind(room_id);
1415 }
1416 if let Some(world_id) = params.world_id {
1417 sql_query = sql_query.bind(world_id);
1418 }
1419 if let Some(entity_id) = params.entity_id {
1420 sql_query = sql_query.bind(entity_id);
1421 }
1422 sql_query = sql_query.bind(params.count as i64);
1423
1424 let rows = sql_query.fetch_all(&self.pool).await.map_err(|e| {
1425 warn!("Vector search failed: {}", e);
1426 ZoeyError::database(format!(
1427 "Vector search failed in table '{}': {}",
1428 params.table_name, e
1429 ))
1430 })?;
1431
1432 let mut memories = Vec::new();
1433 for row in rows {
1434 let id: uuid::Uuid = row.get("id");
1435 let entity_id: uuid::Uuid = row.get("entity_id");
1436 let agent_id: uuid::Uuid = row.get("agent_id");
1437 let room_id: uuid::Uuid = row.get("room_id");
1438 let content_value: serde_json::Value = row.get("content");
1439 let metadata_value: Option<serde_json::Value> = row.get("metadata");
1440 let created_at: i64 = row.get("created_at");
1441 let unique_flag: Option<bool> = row.get("unique_flag");
1442 let similarity: f32 = row.get("similarity");
1443
1444 let content: Content = serde_json::from_value(content_value)?;
1445 let metadata: Option<MemoryMetadata> = metadata_value
1446 .map(|v| serde_json::from_value(v))
1447 .transpose()?;
1448
1449 memories.push(Memory {
1450 id,
1451 entity_id,
1452 agent_id,
1453 room_id,
1454 content,
1455 embedding: None, metadata,
1457 created_at,
1458 unique: unique_flag,
1459 similarity: Some(similarity), });
1461 }
1462
1463 info!(
1464 "Found {} memories similar to query embedding",
1465 memories.len()
1466 );
1467 Ok(memories)
1468 }
1469
1470 async fn get_cached_embeddings(&self, params: MemoryQuery) -> Result<Vec<Memory>> {
1471 use tracing::{debug, info};
1472
1473 debug!(
1474 "Getting cached embeddings for agent_id: {:?}, room_id: {:?}, count: {:?}",
1475 params.agent_id, params.room_id, params.count
1476 );
1477
1478 let validated_table = validate_table_name(¶ms.table_name)?;
1480
1481 let mut query = format!(
1483 "SELECT id, entity_id, agent_id, room_id, content, embedding, metadata, created_at, unique_flag
1484 FROM {}
1485 WHERE embedding IS NOT NULL",
1486 validated_table
1487 );
1488
1489 let mut param_count = 1;
1490 if params.agent_id.is_some() {
1491 query.push_str(&format!(" AND agent_id = ${}", param_count));
1492 param_count += 1;
1493 }
1494 if params.room_id.is_some() {
1495 query.push_str(&format!(" AND room_id = ${}", param_count));
1496 param_count += 1;
1497 }
1498 if params.entity_id.is_some() {
1499 query.push_str(&format!(" AND entity_id = ${}", param_count));
1500 param_count += 1;
1501 }
1502 if params.unique.is_some() {
1503 query.push_str(&format!(" AND unique_flag = ${}", param_count));
1504 param_count += 1;
1505 }
1506
1507 query.push_str(" ORDER BY created_at DESC");
1508
1509 if let Some(limit) = params.count {
1510 query.push_str(&format!(" LIMIT ${}", param_count));
1511 }
1512
1513 let mut sql_query = sqlx::query(&query);
1515
1516 if let Some(agent_id) = params.agent_id {
1517 sql_query = sql_query.bind(agent_id);
1518 }
1519 if let Some(room_id) = params.room_id {
1520 sql_query = sql_query.bind(room_id);
1521 }
1522 if let Some(entity_id) = params.entity_id {
1523 sql_query = sql_query.bind(entity_id);
1524 }
1525 if let Some(unique) = params.unique {
1526 sql_query = sql_query.bind(unique);
1527 }
1528 if let Some(limit) = params.count {
1529 sql_query = sql_query.bind(limit as i64);
1530 }
1531
1532 let rows = sql_query.fetch_all(&self.pool).await?;
1533
1534 let mut memories = Vec::new();
1535 for row in rows {
1536 let id: uuid::Uuid = row.get("id");
1537 let entity_id: uuid::Uuid = row.get("entity_id");
1538 let agent_id: uuid::Uuid = row.get("agent_id");
1539 let room_id: uuid::Uuid = row.get("room_id");
1540 let content_value: serde_json::Value = row.get("content");
1541 let embedding_str: Option<String> = row.get("embedding");
1542 let metadata_value: Option<serde_json::Value> = row.get("metadata");
1543 let created_at: i64 = row.get("created_at");
1544 let unique_flag: Option<bool> = row.get("unique_flag");
1545
1546 let content: Content = serde_json::from_value(content_value)?;
1547
1548 let embedding = if let Some(emb_str) = embedding_str {
1550 let trimmed = emb_str.trim_matches(|c| c == '[' || c == ']');
1552 let vec: std::result::Result<Vec<f32>, _> = trimmed
1553 .split(',')
1554 .map(|s| s.trim().parse::<f32>())
1555 .collect();
1556 Some(vec.map_err(|e| {
1557 ZoeyError::database(format!("Failed to parse embedding: {}", e))
1558 })?)
1559 } else {
1560 None
1561 };
1562
1563 let metadata: Option<MemoryMetadata> = metadata_value
1564 .map(|v| serde_json::from_value(v))
1565 .transpose()?;
1566
1567 memories.push(Memory {
1568 id,
1569 entity_id,
1570 agent_id,
1571 room_id,
1572 content,
1573 embedding,
1574 metadata,
1575 created_at,
1576 unique: unique_flag,
1577 similarity: None,
1578 });
1579 }
1580
1581 info!(
1582 "Retrieved {} cached embeddings from {}",
1583 memories.len(),
1584 params.table_name
1585 );
1586 Ok(memories)
1587 }
1588
1589 async fn update_memory(&self, memory: &Memory) -> Result<bool> {
1590 let result = sqlx::query(
1591 "UPDATE memories SET content = $2, embedding = $3, metadata = $4 WHERE id = $1",
1592 )
1593 .bind(memory.id)
1594 .bind(serde_json::to_value(&memory.content)?)
1595 .bind(&memory.embedding)
1596 .bind(serde_json::to_value(&memory.metadata)?)
1597 .execute(&self.pool)
1598 .await?;
1599
1600 Ok(result.rows_affected() > 0)
1601 }
1602
1603 async fn remove_memory(&self, memory_id: UUID, _table_name: &str) -> Result<bool> {
1604 let result = sqlx::query("DELETE FROM memories WHERE id = $1")
1605 .bind(memory_id)
1606 .execute(&self.pool)
1607 .await?;
1608
1609 Ok(result.rows_affected() > 0)
1610 }
1611
1612 async fn remove_all_memories(&self, agent_id: UUID, _table_name: &str) -> Result<bool> {
1613 let result = sqlx::query("DELETE FROM memories WHERE agent_id = $1")
1614 .bind(agent_id)
1615 .execute(&self.pool)
1616 .await?;
1617
1618 Ok(result.rows_affected() > 0)
1619 }
1620
1621 async fn count_memories(&self, params: MemoryQuery) -> Result<usize> {
1622 let mut query = String::from("SELECT COUNT(*) as count FROM memories WHERE 1=1");
1623 let mut bind_idx = 1;
1624 let mut query_args = sqlx::postgres::PgArguments::default();
1625
1626 if let Some(agent_id) = params.agent_id {
1627 query.push_str(&format!(" AND agent_id = ${}", bind_idx));
1628 query_args
1629 .add(agent_id)
1630 .map_err(|e| ZoeyError::Database(e.to_string()))?;
1631 bind_idx += 1;
1632 }
1633 if let Some(room_id) = params.room_id {
1634 query.push_str(&format!(" AND room_id = ${}", bind_idx));
1635 query_args
1636 .add(room_id)
1637 .map_err(|e| ZoeyError::Database(e.to_string()))?;
1638 bind_idx += 1;
1639 }
1640 if let Some(entity_id) = params.entity_id {
1641 query.push_str(&format!(" AND entity_id = ${}", bind_idx));
1642 query_args
1643 .add(entity_id)
1644 .map_err(|e| ZoeyError::Database(e.to_string()))?;
1645 bind_idx += 1;
1646 }
1647 if let Some(unique) = params.unique {
1648 query.push_str(&format!(" AND unique_flag = ${}", bind_idx));
1649 query_args
1650 .add(unique)
1651 .map_err(|e| ZoeyError::Database(e.to_string()))?;
1652 bind_idx += 1;
1653 }
1654
1655 let row = sqlx::query_with(&query, query_args)
1656 .fetch_one(&self.pool)
1657 .await?;
1658
1659 let count: i64 = row.get("count");
1660 Ok(count as usize)
1661 }
1662
1663 async fn get_world(&self, world_id: UUID) -> Result<Option<World>> {
1664 let row = sqlx::query(
1665 "SELECT id, name, agent_id, server_id, metadata, created_at FROM worlds WHERE id = $1",
1666 )
1667 .bind(world_id)
1668 .fetch_optional(&self.pool)
1669 .await?;
1670 Ok(row.map(|row| World {
1671 id: row.get("id"),
1672 name: row.get("name"),
1673 agent_id: row.get("agent_id"),
1674 server_id: row.get("server_id"),
1675 metadata: row
1676 .try_get("metadata")
1677 .ok()
1678 .and_then(|v: serde_json::Value| serde_json::from_value(v).ok())
1679 .unwrap_or_default(),
1680 created_at: row.get("created_at"),
1681 }))
1682 }
1683
1684 async fn ensure_world(&self, world: &World) -> Result<()> {
1685 sqlx::query(
1686 "INSERT INTO worlds (id, name, agent_id, server_id, metadata, created_at)
1687 VALUES ($1, $2, $3, $4, $5, $6)
1688 ON CONFLICT (id) DO UPDATE SET name = EXCLUDED.name, server_id = EXCLUDED.server_id, metadata = EXCLUDED.metadata"
1689 )
1690 .bind(world.id)
1691 .bind(&world.name)
1692 .bind(world.agent_id)
1693 .bind(&world.server_id)
1694 .bind(serde_json::to_value(&world.metadata)?)
1695 .bind(world.created_at.unwrap_or_else(|| chrono::Utc::now().timestamp()))
1696 .execute(&self.pool)
1697 .await?;
1698 Ok(())
1699 }
1700
1701 async fn get_room(&self, room_id: UUID) -> Result<Option<Room>> {
1702 let row = sqlx::query(
1703 "SELECT id, agent_id, name, source, type, channel_id, server_id, world_id, metadata, created_at FROM rooms WHERE id = $1"
1704 )
1705 .bind(room_id)
1706 .fetch_optional(&self.pool)
1707 .await?;
1708 Ok(row.map(|row| Room {
1709 id: row.get("id"),
1710 agent_id: row.get("agent_id"),
1711 name: row.get("name"),
1712 source: row.get("source"),
1713 channel_type: {
1714 let t: String = row.get("type");
1715 match t.as_str() {
1716 "DM" => ChannelType::Dm,
1717 "VOICE_DM" => ChannelType::VoiceDm,
1718 "GROUP_DM" => ChannelType::GroupDm,
1719 "GUILD_TEXT" => ChannelType::GuildText,
1720 "GUILD_VOICE" => ChannelType::GuildVoice,
1721 "THREAD" => ChannelType::Thread,
1722 "FEED" => ChannelType::Feed,
1723 "SELF" => ChannelType::SelfChannel,
1724 "API" => ChannelType::Api,
1725 "WORLD" => ChannelType::World,
1726 _ => ChannelType::Unknown,
1727 }
1728 },
1729 channel_id: row.get("channel_id"),
1730 server_id: row.get("server_id"),
1731 world_id: row.get("world_id"),
1732 metadata: row
1733 .try_get("metadata")
1734 .ok()
1735 .and_then(|v: serde_json::Value| serde_json::from_value(v).ok())
1736 .unwrap_or_default(),
1737 created_at: row.get("created_at"),
1738 }))
1739 }
1740
1741 async fn create_room(&self, room: &Room) -> Result<UUID> {
1742 sqlx::query(
1743 "INSERT INTO rooms (id, agent_id, name, source, type, channel_id, server_id, world_id, metadata, created_at)
1744 VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)"
1745 )
1746 .bind(room.id)
1747 .bind(room.agent_id)
1748 .bind(&room.name)
1749 .bind(&room.source)
1750 .bind(match room.channel_type {
1751 ChannelType::Dm => "DM",
1752 ChannelType::VoiceDm => "VOICE_DM",
1753 ChannelType::GroupDm => "GROUP_DM",
1754 ChannelType::GuildText => "GUILD_TEXT",
1755 ChannelType::GuildVoice => "GUILD_VOICE",
1756 ChannelType::Thread => "THREAD",
1757 ChannelType::Feed => "FEED",
1758 ChannelType::SelfChannel => "SELF",
1759 ChannelType::Api => "API",
1760 ChannelType::World => "WORLD",
1761 ChannelType::Unknown => "UNKNOWN",
1762 })
1763 .bind(&room.channel_id)
1764 .bind(&room.server_id)
1765 .bind(room.world_id)
1766 .bind(serde_json::to_value(&room.metadata)?)
1767 .bind(room.created_at.unwrap_or_else(|| chrono::Utc::now().timestamp()))
1768 .execute(&self.pool)
1769 .await?;
1770 Ok(room.id)
1771 }
1772
1773 async fn get_rooms(&self, world_id: UUID) -> Result<Vec<Room>> {
1774 let rows = sqlx::query(
1775 "SELECT id, agent_id, name, source, type, channel_id, server_id, world_id, metadata, created_at FROM rooms WHERE world_id = $1"
1776 )
1777 .bind(world_id)
1778 .fetch_all(&self.pool)
1779 .await?;
1780 Ok(rows
1781 .into_iter()
1782 .map(|row| Room {
1783 id: row.get("id"),
1784 agent_id: row.get("agent_id"),
1785 name: row.get("name"),
1786 source: row.get("source"),
1787 channel_type: {
1788 let t: String = row.get("type");
1789 match t.as_str() {
1790 "DM" => ChannelType::Dm,
1791 "VOICE_DM" => ChannelType::VoiceDm,
1792 "GROUP_DM" => ChannelType::GroupDm,
1793 "GUILD_TEXT" => ChannelType::GuildText,
1794 "GUILD_VOICE" => ChannelType::GuildVoice,
1795 "THREAD" => ChannelType::Thread,
1796 "FEED" => ChannelType::Feed,
1797 "SELF" => ChannelType::SelfChannel,
1798 "API" => ChannelType::Api,
1799 "WORLD" => ChannelType::World,
1800 _ => ChannelType::Unknown,
1801 }
1802 },
1803 channel_id: row.get("channel_id"),
1804 server_id: row.get("server_id"),
1805 world_id: row.get("world_id"),
1806 metadata: row
1807 .try_get("metadata")
1808 .ok()
1809 .and_then(|v: serde_json::Value| serde_json::from_value(v).ok())
1810 .unwrap_or_default(),
1811 created_at: row.get("created_at"),
1812 })
1813 .collect())
1814 }
1815
1816 async fn get_rooms_for_agent(&self, agent_id: UUID) -> Result<Vec<Room>> {
1817 let rows = sqlx::query(
1818 "SELECT id, agent_id, name, source, type, channel_id, server_id, world_id, metadata, created_at FROM rooms WHERE agent_id = $1"
1819 )
1820 .bind(agent_id)
1821 .fetch_all(&self.pool)
1822 .await?;
1823 Ok(rows
1824 .into_iter()
1825 .map(|row| Room {
1826 id: row.get("id"),
1827 agent_id: row.get("agent_id"),
1828 name: row.get("name"),
1829 source: row.get("source"),
1830 channel_type: {
1831 let t: String = row.get("type");
1832 match t.as_str() {
1833 "DM" => ChannelType::Dm,
1834 "VOICE_DM" => ChannelType::VoiceDm,
1835 "GROUP_DM" => ChannelType::GroupDm,
1836 "GUILD_TEXT" => ChannelType::GuildText,
1837 "GUILD_VOICE" => ChannelType::GuildVoice,
1838 "THREAD" => ChannelType::Thread,
1839 "FEED" => ChannelType::Feed,
1840 "SELF" => ChannelType::SelfChannel,
1841 "API" => ChannelType::Api,
1842 "WORLD" => ChannelType::World,
1843 _ => ChannelType::Unknown,
1844 }
1845 },
1846 channel_id: row.get("channel_id"),
1847 server_id: row.get("server_id"),
1848 world_id: row.get("world_id"),
1849 metadata: row
1850 .try_get("metadata")
1851 .ok()
1852 .and_then(|v: serde_json::Value| serde_json::from_value(v).ok())
1853 .unwrap_or_default(),
1854 created_at: row.get("created_at"),
1855 })
1856 .collect())
1857 }
1858
1859 async fn add_participant(&self, entity_id: UUID, room_id: UUID) -> Result<bool> {
1860 let result = sqlx::query(
1861 "INSERT INTO participants (entity_id, room_id, joined_at, metadata) VALUES ($1, $2, $3, $4)"
1862 )
1863 .bind(entity_id)
1864 .bind(room_id)
1865 .bind(chrono::Utc::now().timestamp())
1866 .bind(serde_json::json!({}))
1867 .execute(&self.pool)
1868 .await?;
1869 Ok(result.rows_affected() > 0)
1870 }
1871
1872 async fn remove_participant(&self, entity_id: UUID, room_id: UUID) -> Result<bool> {
1873 let result = sqlx::query("DELETE FROM participants WHERE entity_id = $1 AND room_id = $2")
1874 .bind(entity_id)
1875 .bind(room_id)
1876 .execute(&self.pool)
1877 .await?;
1878 Ok(result.rows_affected() > 0)
1879 }
1880
1881 async fn get_participants(&self, room_id: UUID) -> Result<Vec<Participant>> {
1882 let rows = sqlx::query(
1883 "SELECT entity_id, room_id, joined_at, metadata FROM participants WHERE room_id = $1",
1884 )
1885 .bind(room_id)
1886 .fetch_all(&self.pool)
1887 .await?;
1888 Ok(rows
1889 .into_iter()
1890 .map(|row| Participant {
1891 entity_id: row.get("entity_id"),
1892 room_id: row.get("room_id"),
1893 joined_at: row.get("joined_at"),
1894 metadata: row
1895 .try_get("metadata")
1896 .ok()
1897 .and_then(|v: serde_json::Value| serde_json::from_value(v).ok())
1898 .unwrap_or_default(),
1899 })
1900 .collect())
1901 }
1902
1903 async fn create_relationship(&self, relationship: &Relationship) -> Result<bool> {
1904 let result = sqlx::query(
1905 "INSERT INTO relationships (entity_id_a, entity_id_b, type, agent_id, metadata, created_at)
1906 VALUES ($1, $2, $3, $4, $5, $6)"
1907 )
1908 .bind(relationship.entity_id_a)
1909 .bind(relationship.entity_id_b)
1910 .bind(&relationship.relationship_type)
1911 .bind(relationship.agent_id)
1912 .bind(serde_json::to_value(&relationship.metadata)?)
1913 .bind(relationship.created_at.unwrap_or_else(|| chrono::Utc::now().timestamp()))
1914 .execute(&self.pool)
1915 .await?;
1916 Ok(result.rows_affected() > 0)
1917 }
1918
1919 async fn get_relationship(
1920 &self,
1921 entity_id_a: UUID,
1922 entity_id_b: UUID,
1923 ) -> Result<Option<Relationship>> {
1924 let row = sqlx::query(
1925 "SELECT entity_id_a, entity_id_b, type, agent_id, metadata, created_at
1926 FROM relationships WHERE (entity_id_a = $1 AND entity_id_b = $2) OR (entity_id_a = $2 AND entity_id_b = $1)"
1927 )
1928 .bind(entity_id_a)
1929 .bind(entity_id_b)
1930 .fetch_optional(&self.pool)
1931 .await?;
1932 Ok(row.map(|row| Relationship {
1933 entity_id_a: row.get("entity_id_a"),
1934 entity_id_b: row.get("entity_id_b"),
1935 relationship_type: row.get("type"),
1936 agent_id: row.get("agent_id"),
1937 metadata: row
1938 .try_get("metadata")
1939 .ok()
1940 .and_then(|v: serde_json::Value| serde_json::from_value(v).ok())
1941 .unwrap_or_default(),
1942 created_at: row.get("created_at"),
1943 }))
1944 }
1945
1946 async fn create_task(&self, task: &Task) -> Result<UUID> {
1947 let status_str = match task.status {
1948 TaskStatus::Pending => "PENDING",
1949 TaskStatus::Running => "RUNNING",
1950 TaskStatus::Completed => "COMPLETED",
1951 TaskStatus::Failed => "FAILED",
1952 TaskStatus::Cancelled => "CANCELLED",
1953 };
1954 sqlx::query(
1955 "INSERT INTO tasks (id, agent_id, task_type, data, status, priority, scheduled_at, executed_at, retry_count, max_retries, error, created_at, updated_at)
1956 VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13)"
1957 )
1958 .bind(task.id)
1959 .bind(task.agent_id)
1960 .bind(&task.task_type)
1961 .bind(&task.data)
1962 .bind(status_str)
1963 .bind(task.priority)
1964 .bind(task.scheduled_at)
1965 .bind(task.executed_at)
1966 .bind(task.retry_count)
1967 .bind(task.max_retries)
1968 .bind(&task.error)
1969 .bind(task.created_at)
1970 .bind(task.updated_at)
1971 .execute(&self.pool)
1972 .await?;
1973 Ok(task.id)
1974 }
1975
1976 async fn update_task(&self, task: &Task) -> Result<bool> {
1977 let status_str = match task.status {
1978 TaskStatus::Pending => "PENDING",
1979 TaskStatus::Running => "RUNNING",
1980 TaskStatus::Completed => "COMPLETED",
1981 TaskStatus::Failed => "FAILED",
1982 TaskStatus::Cancelled => "CANCELLED",
1983 };
1984 let result = sqlx::query(
1985 "UPDATE tasks SET data = $2, status = $3, priority = $4, scheduled_at = $5, executed_at = $6, retry_count = $7, max_retries = $8, error = $9, updated_at = $10 WHERE id = $1"
1986 )
1987 .bind(task.id)
1988 .bind(&task.data)
1989 .bind(status_str)
1990 .bind(task.priority)
1991 .bind(task.scheduled_at)
1992 .bind(task.executed_at)
1993 .bind(task.retry_count)
1994 .bind(task.max_retries)
1995 .bind(&task.error)
1996 .bind(task.updated_at.unwrap_or_else(|| chrono::Utc::now().timestamp()))
1997 .execute(&self.pool)
1998 .await?;
1999 Ok(result.rows_affected() > 0)
2000 }
2001
2002 async fn get_task(&self, task_id: UUID) -> Result<Option<Task>> {
2003 let row = sqlx::query(
2004 "SELECT id, agent_id, task_type, data, status, priority, scheduled_at, executed_at, retry_count, max_retries, error, created_at, updated_at FROM tasks WHERE id = $1"
2005 )
2006 .bind(task_id)
2007 .fetch_optional(&self.pool)
2008 .await?;
2009 Ok(row.map(|row| {
2010 let status_str: String = row.get("status");
2011 let status = match status_str.as_str() {
2012 "PENDING" => TaskStatus::Pending,
2013 "RUNNING" => TaskStatus::Running,
2014 "COMPLETED" => TaskStatus::Completed,
2015 "FAILED" => TaskStatus::Failed,
2016 "CANCELLED" => TaskStatus::Cancelled,
2017 _ => TaskStatus::Pending,
2018 };
2019 Task {
2020 id: row.get("id"),
2021 agent_id: row.get("agent_id"),
2022 task_type: row.get("task_type"),
2023 data: row.get("data"),
2024 status,
2025 priority: row.get("priority"),
2026 scheduled_at: row.get("scheduled_at"),
2027 executed_at: row.get("executed_at"),
2028 retry_count: row.get("retry_count"),
2029 max_retries: row.get("max_retries"),
2030 error: row.get("error"),
2031 created_at: row.get("created_at"),
2032 updated_at: row.get("updated_at"),
2033 }
2034 }))
2035 }
2036
2037 async fn get_pending_tasks(&self, agent_id: UUID) -> Result<Vec<Task>> {
2038 let rows = sqlx::query(
2039 "SELECT id, agent_id, task_type, data, status, priority, scheduled_at, executed_at, retry_count, max_retries, error, created_at, updated_at
2040 FROM tasks WHERE agent_id = $1 AND status = 'PENDING' ORDER BY scheduled_at NULLS LAST, created_at ASC"
2041 )
2042 .bind(agent_id)
2043 .fetch_all(&self.pool)
2044 .await?;
2045 Ok(rows
2046 .into_iter()
2047 .map(|row| Task {
2048 id: row.get("id"),
2049 agent_id: row.get("agent_id"),
2050 task_type: row.get("task_type"),
2051 data: row.get("data"),
2052 status: TaskStatus::Pending,
2053 priority: row.get("priority"),
2054 scheduled_at: row.get("scheduled_at"),
2055 executed_at: row.get("executed_at"),
2056 retry_count: row.get("retry_count"),
2057 max_retries: row.get("max_retries"),
2058 error: row.get("error"),
2059 created_at: row.get("created_at"),
2060 updated_at: row.get("updated_at"),
2061 })
2062 .collect())
2063 }
2064
2065 async fn log(&self, log: &Log) -> Result<()> {
2066 sqlx::query(
2067 "INSERT INTO logs (entity_id, room_id, body, type, created_at)
2068 VALUES ($1, $2, $3, $4, $5)",
2069 )
2070 .bind(log.entity_id)
2071 .bind(log.room_id)
2072 .bind(&log.body)
2073 .bind(&log.log_type)
2074 .bind(log.created_at)
2075 .execute(&self.pool)
2076 .await?;
2077
2078 Ok(())
2079 }
2080
2081 async fn get_logs(&self, params: LogQuery) -> Result<Vec<Log>> {
2082 let mut query = String::from(
2083 "SELECT id, entity_id, room_id, body, type, created_at FROM logs WHERE 1=1",
2084 );
2085 let mut idx = 1;
2086 if params.entity_id.is_some() {
2087 query.push_str(&format!(" AND entity_id = ${}", idx));
2088 idx += 1;
2089 }
2090 if params.room_id.is_some() {
2091 query.push_str(&format!(" AND room_id = ${}", idx));
2092 idx += 1;
2093 }
2094 if params.log_type.is_some() {
2095 query.push_str(&format!(" AND type = ${}", idx));
2096 idx += 1;
2097 }
2098 query.push_str(" ORDER BY created_at DESC");
2099 if params.limit.is_some() {
2100 query.push_str(&format!(" LIMIT ${}", idx));
2101 idx += 1;
2102 }
2103 if params.offset.is_some() {
2104 query.push_str(&format!(" OFFSET ${}", idx));
2105 }
2106 let mut sql = sqlx::query(&query);
2107 if let Some(eid) = params.entity_id {
2108 sql = sql.bind(eid);
2109 }
2110 if let Some(rid) = params.room_id {
2111 sql = sql.bind(rid);
2112 }
2113 if let Some(t) = params.log_type {
2114 sql = sql.bind(t);
2115 }
2116 if let Some(l) = params.limit {
2117 sql = sql.bind(l as i64);
2118 }
2119 if let Some(o) = params.offset {
2120 sql = sql.bind(o as i64);
2121 }
2122 let rows = sql.fetch_all(&self.pool).await?;
2123 Ok(rows
2124 .into_iter()
2125 .map(|row| Log {
2126 id: Some(row.get("id")),
2127 entity_id: row.get("entity_id"),
2128 room_id: row.get("room_id"),
2129 body: row.get("body"),
2130 log_type: row.get("type"),
2131 created_at: row.get("created_at"),
2132 })
2133 .collect())
2134 }
2135
2136 async fn get_agent_run_summaries(
2137 &self,
2138 params: RunSummaryQuery,
2139 ) -> Result<AgentRunSummaryResult> {
2140 let mut base = String::from("FROM llm_costs WHERE 1=1");
2141 let mut idx = 1;
2142 if params.agent_id.is_some() {
2143 base.push_str(&format!(" AND agent_id = ${}", idx));
2144 idx += 1;
2145 }
2146 if params.status.is_some() {
2147 base.push_str(&format!(" AND success = ${}", idx));
2148 idx += 1;
2149 }
2150 let count_sql = format!("SELECT COUNT(*) as count {}", base);
2151 let mut count_q = sqlx::query(&count_sql);
2152 if let Some(aid) = params.agent_id {
2153 count_q = count_q.bind(aid);
2154 }
2155 if let Some(st) = params.status {
2156 count_q = count_q.bind(matches!(st, RunStatus::Completed));
2157 }
2158 let count_row = count_q.fetch_one(&self.pool).await?;
2159 let total: i64 = count_row.get("count");
2160
2161 let mut query = format!(
2162 "SELECT id, timestamp, success, conversation_id {} ORDER BY timestamp DESC",
2163 base
2164 );
2165 if params.limit.is_some() {
2166 query.push_str(&format!(" LIMIT ${}", idx));
2167 idx += 1;
2168 }
2169 if params.offset.is_some() {
2170 query.push_str(&format!(" OFFSET ${}", idx));
2171 }
2172 let mut sql = sqlx::query(&query);
2173 if let Some(aid) = params.agent_id {
2174 sql = sql.bind(aid);
2175 }
2176 if let Some(st) = params.status {
2177 sql = sql.bind(matches!(st, RunStatus::Completed));
2178 }
2179 if let Some(l) = params.limit {
2180 sql = sql.bind(l as i64);
2181 }
2182 if let Some(o) = params.offset {
2183 sql = sql.bind(o as i64);
2184 }
2185 let rows = sql.fetch_all(&self.pool).await?;
2186 let mut runs = Vec::new();
2187 for row in rows {
2188 let id: uuid::Uuid = row.get("id");
2189 let ts: chrono::DateTime<chrono::Utc> = row.get("timestamp");
2190 let success: bool = row.get("success");
2191 let status = if success {
2192 RunStatus::Completed
2193 } else {
2194 RunStatus::Error
2195 };
2196 runs.push(AgentRunSummary {
2197 run_id: id.to_string(),
2198 status,
2199 started_at: Some(ts.timestamp()),
2200 ended_at: Some(ts.timestamp()),
2201 duration_ms: None,
2202 message_id: None,
2203 room_id: None,
2204 entity_id: None,
2205 metadata: None,
2206 counts: None,
2207 });
2208 }
2209 Ok(AgentRunSummaryResult {
2210 runs,
2211 total: total as usize,
2212 has_more: params
2213 .limit
2214 .map(|l| (l as i64) + (params.offset.unwrap_or(0) as i64) < total)
2215 .unwrap_or(false),
2216 })
2217 }
2218}
2219
2220impl PostgresAdapter {
2221 pub async fn persist_llm_cost(&self, record: LLMCostRecord) -> Result<()> {
2222 sqlx::query(
2223 "INSERT INTO llm_costs (id, timestamp, agent_id, user_id, conversation_id, action_name, evaluator_name, provider, model, temperature, prompt_tokens, completion_tokens, total_tokens, cached_tokens, input_cost_usd, output_cost_usd, total_cost_usd, latency_ms, ttft_ms, success, error, prompt_hash, prompt_preview)
2224 VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17, $18, $19, $20, $21, $22, $23)"
2225 )
2226 .bind(record.id)
2227 .bind(record.timestamp)
2228 .bind(record.agent_id)
2229 .bind(record.user_id)
2230 .bind(record.conversation_id)
2231 .bind(record.action_name)
2232 .bind(record.evaluator_name)
2233 .bind(record.provider)
2234 .bind(record.model)
2235 .bind(record.temperature)
2236 .bind(record.prompt_tokens as i64)
2237 .bind(record.completion_tokens as i64)
2238 .bind(record.total_tokens as i64)
2239 .bind(record.cached_tokens.map(|v| v as i64))
2240 .bind(record.input_cost_usd)
2241 .bind(record.output_cost_usd)
2242 .bind(record.total_cost_usd)
2243 .bind(record.latency_ms as i64)
2244 .bind(record.ttft_ms.map(|v| v as i64))
2245 .bind(record.success)
2246 .bind(record.error)
2247 .bind(record.prompt_hash)
2248 .bind(record.prompt_preview)
2249 .execute(&self.pool)
2250 .await?;
2251 Ok(())
2252 }
2253}
2254
2255#[cfg(test)]
2256mod tests {
2257 use super::*;
2258
2259 #[test]
2260 fn test_postgres_adapter_creation() {
2261 assert!(true);
2262 }
2263}