zoey_storage_sql/
postgres.rs

1//! PostgreSQL database adapter
2
3use async_trait::async_trait;
4use zoey_core::observability::types::LLMCostRecord;
5use zoey_core::{types::*, ZoeyError, Result};
6use sqlx::{postgres::PgPoolOptions, Arguments, PgPool, Row};
7use tracing::{debug, info, warn};
8
9/// Allowed table names for SQL queries (whitelist approach for SQL injection prevention)
10const ALLOWED_TABLES: &[&str] = &[
11    "memories",
12    "agents",
13    "entities",
14    "worlds",
15    "rooms",
16    "relationships",
17    "goals",
18    "logs",
19    "cache",
20    "components",
21    "embeddings",
22    "documents",
23    "conversations",
24];
25
26/// Validate table name to prevent SQL injection
27///
28/// This function provides defense-in-depth against SQL injection:
29/// 1. Whitelist validation - only known tables are allowed
30/// 2. Character validation - only alphanumeric and underscore allowed
31/// 3. Length limit - prevents buffer overflow attacks
32fn validate_table_name(name: &str) -> Result<&str> {
33    // Length check (defense against buffer overflow)
34    if name.len() > 64 {
35        warn!("Rejected table name due to length: {} chars", name.len());
36        return Err(ZoeyError::validation(format!(
37            "Table name too long: {} (max 64 characters)",
38            name.len()
39        )));
40    }
41
42    // Character validation (alphanumeric and underscore only)
43    if !name.chars().all(|c| c.is_ascii_alphanumeric() || c == '_') {
44        warn!("Rejected table name with invalid characters: {}", name);
45        return Err(ZoeyError::validation(format!(
46            "Invalid table name '{}': only alphanumeric characters and underscores allowed",
47            name
48        )));
49    }
50
51    // Whitelist validation
52    if !ALLOWED_TABLES.contains(&name) {
53        warn!("Rejected unknown table name: {}", name);
54        return Err(ZoeyError::validation(format!(
55            "Unknown table name '{}': not in allowed list",
56            name
57        )));
58    }
59
60    Ok(name)
61}
62
63/// Validate SQL identifier (table/column) for safety
64fn validate_identifier(name: &str) -> Result<&str> {
65    if name.len() > 64 {
66        warn!("Rejected identifier due to length: {} chars", name.len());
67        return Err(ZoeyError::validation(format!(
68            "Identifier too long: {} (max 64 characters)",
69            name.len()
70        )));
71    }
72    if !name.chars().all(|c| c.is_ascii_alphanumeric() || c == '_') {
73        warn!("Rejected identifier with invalid characters: {}", name);
74        return Err(ZoeyError::validation(format!(
75            "Invalid identifier '{}': only alphanumeric characters and underscores allowed",
76            name
77        )));
78    }
79    Ok(name)
80}
81
82/// PostgreSQL database adapter
83pub struct PostgresAdapter {
84    pool: PgPool,
85    embedding_dimension: std::sync::RwLock<usize>,
86}
87
88impl PostgresAdapter {
89    /// Set current agent context for RLS policies
90    pub async fn set_current_agent(&self, agent_id: uuid::Uuid) -> Result<()> {
91        sqlx::query("SELECT set_config('app.current_agent_id', $1, false)")
92            .bind(agent_id.to_string())
93            .execute(&self.pool)
94            .await?;
95        Ok(())
96    }
97    /// Create a new PostgreSQL adapter
98    pub async fn new(database_url: &str) -> Result<Self> {
99        info!("Connecting to PostgreSQL database...");
100
101        let pool = PgPoolOptions::new()
102            .max_connections(10)
103            .connect(database_url)
104            .await
105            .map_err(|e| ZoeyError::DatabaseSqlx(e))?;
106
107        Ok(Self {
108            pool,
109            embedding_dimension: std::sync::RwLock::new(1536), // Default OpenAI embedding dimension
110        })
111    }
112
113    /// Create with custom pool options
114    pub async fn with_pool(pool: PgPool) -> Self {
115        Self {
116            pool,
117            embedding_dimension: std::sync::RwLock::new(1536),
118        }
119    }
120
121    /// Initialize database schema
122    async fn init_schema(&self) -> Result<()> {
123        debug!("Initializing database schema...");
124
125        // Ensure pgvector extension is available
126        sqlx::query("CREATE EXTENSION IF NOT EXISTS vector")
127            .execute(&self.pool)
128            .await
129            .ok(); // Ignore error if extension already exists or can't be created
130
131        // ============================================================
132        // CREATE TABLES
133        // ============================================================
134
135        // Agents table (root entity - no foreign keys)
136        // Note: Using BIGINT for timestamps (Unix epoch) for Rust i64 compatibility
137        sqlx::query(
138            r#"
139            CREATE TABLE IF NOT EXISTS agents (
140                id UUID PRIMARY KEY,
141                name TEXT NOT NULL,
142                character JSONB NOT NULL,
143                created_at BIGINT NOT NULL DEFAULT extract(epoch from now())::bigint,
144                updated_at BIGINT
145            )
146        "#,
147        )
148        .execute(&self.pool)
149        .await?;
150
151        // Entities table (belongs to agent)
152        sqlx::query(
153            r#"
154            CREATE TABLE IF NOT EXISTS entities (
155                id UUID PRIMARY KEY,
156                agent_id UUID NOT NULL,
157                name TEXT,
158                username TEXT,
159                email TEXT,
160                avatar_url TEXT,
161                metadata JSONB DEFAULT '{}',
162                created_at BIGINT NOT NULL DEFAULT extract(epoch from now())::bigint
163            )
164        "#,
165        )
166        .execute(&self.pool)
167        .await?;
168
169        // Worlds table (belongs to agent)
170        sqlx::query(
171            r#"
172            CREATE TABLE IF NOT EXISTS worlds (
173                id UUID PRIMARY KEY,
174                name TEXT NOT NULL,
175                agent_id UUID NOT NULL,
176                server_id TEXT,
177                metadata JSONB DEFAULT '{}',
178                created_at BIGINT NOT NULL DEFAULT extract(epoch from now())::bigint
179            )
180        "#,
181        )
182        .execute(&self.pool)
183        .await?;
184
185        // Rooms table (belongs to world and optionally agent)
186        sqlx::query(
187            r#"
188            CREATE TABLE IF NOT EXISTS rooms (
189                id UUID PRIMARY KEY,
190                agent_id UUID,
191                name TEXT NOT NULL,
192                source TEXT NOT NULL,
193                type TEXT NOT NULL,
194                channel_id TEXT,
195                server_id TEXT,
196                world_id UUID NOT NULL,
197                metadata JSONB DEFAULT '{}',
198                created_at BIGINT NOT NULL DEFAULT extract(epoch from now())::bigint
199            )
200        "#,
201        )
202        .execute(&self.pool)
203        .await?;
204
205        // Memories table (core data - belongs to entity, agent, room)
206        sqlx::query(
207            r#"
208            CREATE TABLE IF NOT EXISTS memories (
209                id UUID PRIMARY KEY,
210                entity_id UUID NOT NULL,
211                agent_id UUID NOT NULL,
212                room_id UUID NOT NULL,
213                content JSONB NOT NULL,
214                embedding vector,
215                metadata JSONB,
216                created_at BIGINT NOT NULL DEFAULT extract(epoch from now())::bigint,
217                unique_flag BOOLEAN DEFAULT FALSE
218            )
219        "#,
220        )
221        .execute(&self.pool)
222        .await?;
223
224        // Participants junction table (entity <-> room)
225        sqlx::query(
226            r#"
227            CREATE TABLE IF NOT EXISTS participants (
228                entity_id UUID NOT NULL,
229                room_id UUID NOT NULL,
230                joined_at BIGINT NOT NULL DEFAULT extract(epoch from now())::bigint,
231                metadata JSONB DEFAULT '{}',
232                PRIMARY KEY (entity_id, room_id)
233            )
234        "#,
235        )
236        .execute(&self.pool)
237        .await?;
238
239        // Relationships table (entity <-> entity with type)
240        // NOTE: Primary key includes type to allow multiple relationship types between same entities
241        sqlx::query(
242            r#"
243            CREATE TABLE IF NOT EXISTS relationships (
244                id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
245                entity_id_a UUID NOT NULL,
246                entity_id_b UUID NOT NULL,
247                type TEXT NOT NULL,
248                agent_id UUID NOT NULL,
249                metadata JSONB DEFAULT '{}',
250                created_at BIGINT NOT NULL DEFAULT extract(epoch from now())::bigint,
251                UNIQUE (entity_id_a, entity_id_b, type)
252            )
253        "#,
254        )
255        .execute(&self.pool)
256        .await?;
257
258        // Components table (ECS-style components attached to entities)
259        sqlx::query(
260            r#"
261            CREATE TABLE IF NOT EXISTS components (
262                id UUID PRIMARY KEY,
263                entity_id UUID NOT NULL,
264                world_id UUID NOT NULL,
265                source_entity_id UUID,
266                type TEXT NOT NULL,
267                data JSONB NOT NULL,
268                created_at BIGINT NOT NULL DEFAULT extract(epoch from now())::bigint,
269                updated_at BIGINT
270            )
271        "#,
272        )
273        .execute(&self.pool)
274        .await?;
275
276        // Tasks table (deferred/scheduled work)
277        sqlx::query(
278            r#"
279            CREATE TABLE IF NOT EXISTS tasks (
280                id UUID PRIMARY KEY,
281                agent_id UUID NOT NULL,
282                task_type TEXT NOT NULL,
283                data JSONB NOT NULL,
284                status TEXT NOT NULL DEFAULT 'PENDING',
285                priority INTEGER DEFAULT 0,
286                scheduled_at BIGINT,
287                executed_at BIGINT,
288                retry_count INTEGER DEFAULT 0,
289                max_retries INTEGER DEFAULT 3,
290                error TEXT,
291                created_at BIGINT NOT NULL DEFAULT extract(epoch from now())::bigint,
292                updated_at BIGINT
293            )
294        "#,
295        )
296        .execute(&self.pool)
297        .await?;
298
299        // Logs table (audit/debug logs)
300        sqlx::query(
301            r#"
302            CREATE TABLE IF NOT EXISTS logs (
303                id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
304                entity_id UUID NOT NULL,
305                room_id UUID,
306                body JSONB NOT NULL,
307                type TEXT NOT NULL,
308                created_at BIGINT NOT NULL DEFAULT extract(epoch from now())::bigint
309            )
310        "#,
311        )
312        .execute(&self.pool)
313        .await?;
314
315        // ============================================================
316        // ADD FOREIGN KEY CONSTRAINTS (idempotent via DO block)
317        // ============================================================
318
319        // Helper: Add FK constraint if it doesn't exist
320        sqlx::query(
321            r#"
322            DO $$
323            BEGIN
324                -- entities -> agents
325                IF NOT EXISTS (SELECT 1 FROM pg_constraint WHERE conname = 'fk_entities_agent') THEN
326                    ALTER TABLE entities ADD CONSTRAINT fk_entities_agent
327                        FOREIGN KEY (agent_id) REFERENCES agents(id) ON DELETE CASCADE;
328                END IF;
329
330                -- worlds -> agents
331                IF NOT EXISTS (SELECT 1 FROM pg_constraint WHERE conname = 'fk_worlds_agent') THEN
332                    ALTER TABLE worlds ADD CONSTRAINT fk_worlds_agent
333                        FOREIGN KEY (agent_id) REFERENCES agents(id) ON DELETE CASCADE;
334                END IF;
335
336                -- rooms -> worlds
337                IF NOT EXISTS (SELECT 1 FROM pg_constraint WHERE conname = 'fk_rooms_world') THEN
338                    ALTER TABLE rooms ADD CONSTRAINT fk_rooms_world
339                        FOREIGN KEY (world_id) REFERENCES worlds(id) ON DELETE CASCADE;
340                END IF;
341
342                -- rooms -> agents (optional)
343                IF NOT EXISTS (SELECT 1 FROM pg_constraint WHERE conname = 'fk_rooms_agent') THEN
344                    ALTER TABLE rooms ADD CONSTRAINT fk_rooms_agent
345                        FOREIGN KEY (agent_id) REFERENCES agents(id) ON DELETE SET NULL;
346                END IF;
347
348                -- memories -> entities
349                IF NOT EXISTS (SELECT 1 FROM pg_constraint WHERE conname = 'fk_memories_entity') THEN
350                    ALTER TABLE memories ADD CONSTRAINT fk_memories_entity
351                        FOREIGN KEY (entity_id) REFERENCES entities(id) ON DELETE CASCADE;
352                END IF;
353
354                -- memories -> agents
355                IF NOT EXISTS (SELECT 1 FROM pg_constraint WHERE conname = 'fk_memories_agent') THEN
356                    ALTER TABLE memories ADD CONSTRAINT fk_memories_agent
357                        FOREIGN KEY (agent_id) REFERENCES agents(id) ON DELETE CASCADE;
358                END IF;
359
360                -- memories -> rooms
361                IF NOT EXISTS (SELECT 1 FROM pg_constraint WHERE conname = 'fk_memories_room') THEN
362                    ALTER TABLE memories ADD CONSTRAINT fk_memories_room
363                        FOREIGN KEY (room_id) REFERENCES rooms(id) ON DELETE CASCADE;
364                END IF;
365
366                -- participants -> entities
367                IF NOT EXISTS (SELECT 1 FROM pg_constraint WHERE conname = 'fk_participants_entity') THEN
368                    ALTER TABLE participants ADD CONSTRAINT fk_participants_entity
369                        FOREIGN KEY (entity_id) REFERENCES entities(id) ON DELETE CASCADE;
370                END IF;
371
372                -- participants -> rooms
373                IF NOT EXISTS (SELECT 1 FROM pg_constraint WHERE conname = 'fk_participants_room') THEN
374                    ALTER TABLE participants ADD CONSTRAINT fk_participants_room
375                        FOREIGN KEY (room_id) REFERENCES rooms(id) ON DELETE CASCADE;
376                END IF;
377
378                -- relationships -> entities (both sides)
379                IF NOT EXISTS (SELECT 1 FROM pg_constraint WHERE conname = 'fk_relationships_entity_a') THEN
380                    ALTER TABLE relationships ADD CONSTRAINT fk_relationships_entity_a
381                        FOREIGN KEY (entity_id_a) REFERENCES entities(id) ON DELETE CASCADE;
382                END IF;
383
384                IF NOT EXISTS (SELECT 1 FROM pg_constraint WHERE conname = 'fk_relationships_entity_b') THEN
385                    ALTER TABLE relationships ADD CONSTRAINT fk_relationships_entity_b
386                        FOREIGN KEY (entity_id_b) REFERENCES entities(id) ON DELETE CASCADE;
387                END IF;
388
389                -- relationships -> agents
390                IF NOT EXISTS (SELECT 1 FROM pg_constraint WHERE conname = 'fk_relationships_agent') THEN
391                    ALTER TABLE relationships ADD CONSTRAINT fk_relationships_agent
392                        FOREIGN KEY (agent_id) REFERENCES agents(id) ON DELETE CASCADE;
393                END IF;
394
395                -- components -> entities
396                IF NOT EXISTS (SELECT 1 FROM pg_constraint WHERE conname = 'fk_components_entity') THEN
397                    ALTER TABLE components ADD CONSTRAINT fk_components_entity
398                        FOREIGN KEY (entity_id) REFERENCES entities(id) ON DELETE CASCADE;
399                END IF;
400
401                -- components -> worlds
402                IF NOT EXISTS (SELECT 1 FROM pg_constraint WHERE conname = 'fk_components_world') THEN
403                    ALTER TABLE components ADD CONSTRAINT fk_components_world
404                        FOREIGN KEY (world_id) REFERENCES worlds(id) ON DELETE CASCADE;
405                END IF;
406
407                -- components -> entities (source)
408                IF NOT EXISTS (SELECT 1 FROM pg_constraint WHERE conname = 'fk_components_source_entity') THEN
409                    ALTER TABLE components ADD CONSTRAINT fk_components_source_entity
410                        FOREIGN KEY (source_entity_id) REFERENCES entities(id) ON DELETE SET NULL;
411                END IF;
412
413                -- tasks -> agents
414                IF NOT EXISTS (SELECT 1 FROM pg_constraint WHERE conname = 'fk_tasks_agent') THEN
415                    ALTER TABLE tasks ADD CONSTRAINT fk_tasks_agent
416                        FOREIGN KEY (agent_id) REFERENCES agents(id) ON DELETE CASCADE;
417                END IF;
418
419                -- logs -> entities
420                IF NOT EXISTS (SELECT 1 FROM pg_constraint WHERE conname = 'fk_logs_entity') THEN
421                    ALTER TABLE logs ADD CONSTRAINT fk_logs_entity
422                        FOREIGN KEY (entity_id) REFERENCES entities(id) ON DELETE CASCADE;
423                END IF;
424
425                -- logs -> rooms (optional)
426                IF NOT EXISTS (SELECT 1 FROM pg_constraint WHERE conname = 'fk_logs_room') THEN
427                    ALTER TABLE logs ADD CONSTRAINT fk_logs_room
428                        FOREIGN KEY (room_id) REFERENCES rooms(id) ON DELETE SET NULL;
429                END IF;
430            END $$;
431        "#,
432        )
433        .execute(&self.pool)
434        .await?;
435
436        // ============================================================
437        // ADD UNIQUE CONSTRAINTS
438        // ============================================================
439
440        // Unique constraint on components (entity, world, type, source_entity)
441        sqlx::query(
442            r#"
443            DO $$
444            BEGIN
445                IF NOT EXISTS (SELECT 1 FROM pg_constraint WHERE conname = 'uq_components_entity_world_type_source') THEN
446                    ALTER TABLE components ADD CONSTRAINT uq_components_entity_world_type_source
447                        UNIQUE (entity_id, world_id, type, source_entity_id);
448                END IF;
449            END $$;
450        "#,
451        )
452        .execute(&self.pool)
453        .await?;
454
455        // ============================================================
456        // CREATE INDICES (comprehensive coverage)
457        // ============================================================
458
459        // -- Entities indices --
460        sqlx::query("CREATE INDEX IF NOT EXISTS idx_entities_agent_id ON entities(agent_id)")
461            .execute(&self.pool)
462            .await?;
463
464        sqlx::query("CREATE INDEX IF NOT EXISTS idx_entities_username ON entities(username) WHERE username IS NOT NULL")
465            .execute(&self.pool)
466            .await?;
467
468        // -- Worlds indices --
469        sqlx::query("CREATE INDEX IF NOT EXISTS idx_worlds_agent_id ON worlds(agent_id)")
470            .execute(&self.pool)
471            .await?;
472
473        sqlx::query("CREATE INDEX IF NOT EXISTS idx_worlds_server_id ON worlds(server_id) WHERE server_id IS NOT NULL")
474            .execute(&self.pool)
475            .await?;
476
477        // -- Rooms indices --
478        sqlx::query("CREATE INDEX IF NOT EXISTS idx_rooms_world_id ON rooms(world_id)")
479            .execute(&self.pool)
480            .await?;
481
482        sqlx::query("CREATE INDEX IF NOT EXISTS idx_rooms_agent_id ON rooms(agent_id) WHERE agent_id IS NOT NULL")
483            .execute(&self.pool)
484            .await?;
485
486        sqlx::query("CREATE INDEX IF NOT EXISTS idx_rooms_channel_id ON rooms(channel_id) WHERE channel_id IS NOT NULL")
487            .execute(&self.pool)
488            .await?;
489
490        sqlx::query("CREATE INDEX IF NOT EXISTS idx_rooms_source_server ON rooms(source, server_id) WHERE server_id IS NOT NULL")
491            .execute(&self.pool)
492            .await?;
493
494        sqlx::query("CREATE INDEX IF NOT EXISTS idx_rooms_type ON rooms(type)")
495            .execute(&self.pool)
496            .await?;
497
498        // -- Memories indices --
499        sqlx::query("CREATE INDEX IF NOT EXISTS idx_memories_agent_id ON memories(agent_id)")
500            .execute(&self.pool)
501            .await?;
502
503        sqlx::query("CREATE INDEX IF NOT EXISTS idx_memories_room_id ON memories(room_id)")
504            .execute(&self.pool)
505            .await?;
506
507        sqlx::query("CREATE INDEX IF NOT EXISTS idx_memories_entity_id ON memories(entity_id)")
508            .execute(&self.pool)
509            .await?;
510
511        sqlx::query(
512            "CREATE INDEX IF NOT EXISTS idx_memories_created_at ON memories(created_at DESC)",
513        )
514        .execute(&self.pool)
515        .await?;
516
517        sqlx::query("CREATE INDEX IF NOT EXISTS idx_memories_agent_room_created ON memories(agent_id, room_id, created_at DESC)")
518            .execute(&self.pool)
519            .await?;
520
521        sqlx::query("CREATE INDEX IF NOT EXISTS idx_memories_unique ON memories(agent_id, room_id) WHERE unique_flag = TRUE")
522            .execute(&self.pool)
523            .await?;
524
525        // -- Participants indices --
526        sqlx::query(
527            "CREATE INDEX IF NOT EXISTS idx_participants_entity_id ON participants(entity_id)",
528        )
529        .execute(&self.pool)
530        .await?;
531
532        sqlx::query("CREATE INDEX IF NOT EXISTS idx_participants_room_id ON participants(room_id)")
533            .execute(&self.pool)
534            .await?;
535
536        // -- Relationships indices --
537        sqlx::query(
538            "CREATE INDEX IF NOT EXISTS idx_relationships_entity_a ON relationships(entity_id_a)",
539        )
540        .execute(&self.pool)
541        .await?;
542
543        sqlx::query(
544            "CREATE INDEX IF NOT EXISTS idx_relationships_entity_b ON relationships(entity_id_b)",
545        )
546        .execute(&self.pool)
547        .await?;
548
549        sqlx::query(
550            "CREATE INDEX IF NOT EXISTS idx_relationships_agent_id ON relationships(agent_id)",
551        )
552        .execute(&self.pool)
553        .await?;
554
555        sqlx::query("CREATE INDEX IF NOT EXISTS idx_relationships_type ON relationships(type)")
556            .execute(&self.pool)
557            .await?;
558
559        // -- Components indices --
560        sqlx::query("CREATE INDEX IF NOT EXISTS idx_components_entity_id ON components(entity_id)")
561            .execute(&self.pool)
562            .await?;
563
564        sqlx::query("CREATE INDEX IF NOT EXISTS idx_components_world_id ON components(world_id)")
565            .execute(&self.pool)
566            .await?;
567
568        sqlx::query("CREATE INDEX IF NOT EXISTS idx_components_type ON components(type)")
569            .execute(&self.pool)
570            .await?;
571
572        sqlx::query(
573            "CREATE INDEX IF NOT EXISTS idx_components_entity_type ON components(entity_id, type)",
574        )
575        .execute(&self.pool)
576        .await?;
577
578        // -- Tasks indices --
579        sqlx::query("CREATE INDEX IF NOT EXISTS idx_tasks_agent_id ON tasks(agent_id)")
580            .execute(&self.pool)
581            .await?;
582
583        sqlx::query("CREATE INDEX IF NOT EXISTS idx_tasks_status ON tasks(status)")
584            .execute(&self.pool)
585            .await?;
586
587        sqlx::query("CREATE INDEX IF NOT EXISTS idx_tasks_status_scheduled ON tasks(status, scheduled_at) WHERE status = 'PENDING'")
588            .execute(&self.pool)
589            .await?;
590
591        sqlx::query("CREATE INDEX IF NOT EXISTS idx_tasks_agent_status ON tasks(agent_id, status)")
592            .execute(&self.pool)
593            .await?;
594
595        // -- Logs indices --
596        sqlx::query("CREATE INDEX IF NOT EXISTS idx_logs_entity_id ON logs(entity_id)")
597            .execute(&self.pool)
598            .await?;
599
600        sqlx::query("CREATE INDEX IF NOT EXISTS idx_logs_room_id ON logs(room_id) WHERE room_id IS NOT NULL")
601            .execute(&self.pool)
602            .await?;
603
604        sqlx::query("CREATE INDEX IF NOT EXISTS idx_logs_type ON logs(type)")
605            .execute(&self.pool)
606            .await?;
607
608        sqlx::query("CREATE INDEX IF NOT EXISTS idx_logs_created_at ON logs(created_at DESC)")
609            .execute(&self.pool)
610            .await?;
611
612        sqlx::query("CREATE INDEX IF NOT EXISTS idx_logs_entity_created ON logs(entity_id, created_at DESC)")
613            .execute(&self.pool)
614            .await?;
615
616        // ============================================================
617        // OBSERVABILITY TABLES
618        // ============================================================
619
620        sqlx::query(
621            r#"
622            CREATE TABLE IF NOT EXISTS llm_costs (
623                id UUID PRIMARY KEY,
624                timestamp TIMESTAMPTZ NOT NULL,
625
626                agent_id UUID NOT NULL,
627                user_id TEXT,
628                conversation_id UUID,
629                action_name TEXT,
630                evaluator_name TEXT,
631
632                provider TEXT NOT NULL,
633                model TEXT NOT NULL,
634                temperature REAL NOT NULL,
635
636                prompt_tokens INTEGER NOT NULL,
637                completion_tokens INTEGER NOT NULL,
638                total_tokens INTEGER NOT NULL,
639                cached_tokens INTEGER,
640
641                input_cost_usd REAL NOT NULL,
642                output_cost_usd REAL NOT NULL,
643                total_cost_usd REAL NOT NULL,
644
645                latency_ms BIGINT NOT NULL,
646                ttft_ms BIGINT,
647
648                success BOOLEAN NOT NULL,
649                error TEXT,
650
651                prompt_hash TEXT,
652                prompt_preview TEXT
653            )
654        "#,
655        )
656        .execute(&self.pool)
657        .await?;
658
659        sqlx::query(
660            r#"
661            CREATE TABLE IF NOT EXISTS stored_prompts (
662                id UUID PRIMARY KEY,
663                timestamp TIMESTAMPTZ NOT NULL,
664                cost_record_id UUID REFERENCES llm_costs(id) ON DELETE CASCADE,
665                agent_id UUID NOT NULL,
666                conversation_id UUID,
667                prompt_hash TEXT NOT NULL,
668                prompt_text TEXT,
669                prompt_length INTEGER NOT NULL,
670                sanitized BOOLEAN NOT NULL,
671                sanitization_level TEXT NOT NULL,
672                completion_text TEXT,
673                completion_length INTEGER NOT NULL DEFAULT 0,
674                model TEXT NOT NULL,
675                temperature REAL NOT NULL
676            )
677        "#,
678        )
679        .execute(&self.pool)
680        .await?;
681
682        // Observability indices
683        sqlx::query("CREATE INDEX IF NOT EXISTS idx_llm_costs_agent_timestamp ON llm_costs(agent_id, timestamp DESC)")
684            .execute(&self.pool)
685            .await?;
686
687        sqlx::query(
688            "CREATE INDEX IF NOT EXISTS idx_llm_costs_provider_model ON llm_costs(provider, model)",
689        )
690        .execute(&self.pool)
691        .await?;
692
693        sqlx::query(
694            "CREATE INDEX IF NOT EXISTS idx_llm_costs_timestamp ON llm_costs(timestamp DESC)",
695        )
696        .execute(&self.pool)
697        .await?;
698
699        sqlx::query("CREATE INDEX IF NOT EXISTS idx_llm_costs_conversation ON llm_costs(conversation_id) WHERE conversation_id IS NOT NULL")
700            .execute(&self.pool)
701            .await?;
702
703        sqlx::query("CREATE INDEX IF NOT EXISTS idx_stored_prompts_agent_timestamp ON stored_prompts(agent_id, timestamp DESC)")
704            .execute(&self.pool)
705            .await?;
706
707        sqlx::query(
708            "CREATE INDEX IF NOT EXISTS idx_stored_prompts_hash ON stored_prompts(prompt_hash)",
709        )
710        .execute(&self.pool)
711        .await?;
712
713        sqlx::query("CREATE INDEX IF NOT EXISTS idx_stored_prompts_cost_record ON stored_prompts(cost_record_id)")
714            .execute(&self.pool)
715            .await?;
716
717        info!("Database schema initialized successfully");
718        Ok(())
719    }
720}
721
722#[async_trait]
723impl IDatabaseAdapter for PostgresAdapter {
724    fn db(&self) -> &dyn std::any::Any {
725        &self.pool
726    }
727
728    async fn initialize(&mut self, _config: Option<serde_json::Value>) -> Result<()> {
729        self.init_schema().await
730    }
731
732    async fn is_ready(&self) -> Result<bool> {
733        // Try a simple query
734        match sqlx::query("SELECT 1").fetch_one(&self.pool).await {
735            Ok(_) => Ok(true),
736            Err(_) => Ok(false),
737        }
738    }
739
740    async fn close(&mut self) -> Result<()> {
741        self.pool.close().await;
742        Ok(())
743    }
744
745    async fn get_connection(&self) -> Result<Box<dyn std::any::Any + Send>> {
746        Ok(Box::new(self.pool.clone()))
747    }
748
749    async fn run_plugin_migrations(
750        &self,
751        plugins: Vec<PluginMigration>,
752        options: MigrationOptions,
753    ) -> Result<()> {
754        if plugins.is_empty() {
755            return Ok(());
756        }
757
758        for plugin in plugins {
759            if let Some(schema) = plugin.schema {
760                if options.verbose {
761                    info!("Applying schema for plugin '{}'", plugin.name);
762                }
763
764                // Expect object: { table_name: { type: "table", columns: { col: type, ... } }, ... }
765                let schema_obj = schema.as_object().ok_or_else(|| {
766                    ZoeyError::validation(format!(
767                        "Invalid schema for plugin '{}': expected JSON object",
768                        plugin.name
769                    ))
770                })?;
771
772                // Build dependency graph from REFERENCES clauses
773                use std::collections::{HashMap, HashSet, VecDeque};
774                let mut deps: HashMap<String, HashSet<String>> = HashMap::new();
775                let mut reverse_deps: HashMap<String, HashSet<String>> = HashMap::new();
776                let table_names: Vec<String> = schema_obj.keys().map(|k| k.to_string()).collect();
777
778                for (tname, tdef) in schema_obj.iter() {
779                    let tname = validate_identifier(tname)?;
780                    let columns_obj =
781                        tdef.get("columns")
782                            .and_then(|v| v.as_object())
783                            .ok_or_else(|| {
784                                ZoeyError::validation(format!(
785                                    "Invalid table definition for '{}': missing 'columns' object",
786                                    tname
787                                ))
788                            })?;
789                    for (_col_name, col_type_val) in columns_obj.iter() {
790                        if let Some(col_type) = col_type_val.as_str() {
791                            if let Some(idx) = col_type.find("REFERENCES") {
792                                let tail = &col_type[idx + "REFERENCES".len()..];
793                                let ref_name = tail
794                                    .trim()
795                                    .split(|c: char| c.is_whitespace() || c == '(')
796                                    .filter(|s| !s.is_empty())
797                                    .next()
798                                    .unwrap_or("");
799                                if !ref_name.is_empty() {
800                                    let ref_name = validate_identifier(ref_name)?;
801                                    if table_names.iter().any(|n| n == ref_name) {
802                                        deps.entry(tname.to_string())
803                                            .or_default()
804                                            .insert(ref_name.to_string());
805                                        reverse_deps
806                                            .entry(ref_name.to_string())
807                                            .or_default()
808                                            .insert(tname.to_string());
809                                    }
810                                }
811                            }
812                        }
813                    }
814                }
815
816                // Kahn's algorithm for topological sort
817                let mut indegree: HashMap<String, usize> = HashMap::new();
818                for t in &table_names {
819                    let d = deps.get(t).map(|s| s.len()).unwrap_or(0);
820                    indegree.insert(t.clone(), d);
821                }
822                let mut q: VecDeque<String> = table_names
823                    .iter()
824                    .filter(|t| indegree.get(*t).copied().unwrap_or(0) == 0)
825                    .cloned()
826                    .collect();
827                let mut order: Vec<String> = Vec::new();
828                while let Some(n) = q.pop_front() {
829                    order.push(n.clone());
830                    if let Some(children) = reverse_deps.get(&n) {
831                        for c in children {
832                            if let Some(e) = indegree.get_mut(c) {
833                                if *e > 0 {
834                                    *e -= 1;
835                                    if *e == 0 {
836                                        q.push_back(c.clone());
837                                    }
838                                }
839                            }
840                        }
841                    }
842                }
843                // Append any remaining (cycles or external deps), keeping original order
844                for t in &table_names {
845                    if !order.contains(t) {
846                        order.push(t.clone());
847                    }
848                }
849
850                // Create tables in sorted order
851                for table_name in order {
852                    let table_def = &schema_obj[&table_name];
853                    let columns_obj = table_def
854                        .get("columns")
855                        .and_then(|v| v.as_object())
856                        .ok_or_else(|| {
857                            ZoeyError::validation(format!(
858                                "Invalid table definition for '{}': missing 'columns' object",
859                                table_name
860                            ))
861                        })?;
862
863                    let mut cols_sql: Vec<String> = Vec::new();
864                    for (col_name, col_type_val) in columns_obj.iter() {
865                        let col_name = validate_identifier(col_name)?;
866                        let col_type = col_type_val.as_str().ok_or_else(|| {
867                            ZoeyError::validation(format!(
868                                "Invalid type for column '{}.{}'",
869                                table_name, col_name
870                            ))
871                        })?;
872
873                        let allowed_chars = |c: char| {
874                            c.is_ascii_alphanumeric()
875                                || matches!(c, ' ' | '_' | '(' | ')' | ',' | '-' | ':')
876                        };
877                        if col_type.len() > 128 || !col_type.chars().all(allowed_chars) {
878                            return Err(ZoeyError::validation(format!(
879                                "Unsupported or unsafe type for '{}.{}': {}",
880                                table_name, col_name, col_type
881                            )));
882                        }
883                        cols_sql.push(format!("{} {}", col_name, col_type));
884                    }
885
886                    let create_sql = format!(
887                        "CREATE TABLE IF NOT EXISTS {} ({})",
888                        table_name,
889                        cols_sql.join(", ")
890                    );
891
892                    if options.verbose {
893                        debug!("{}", create_sql);
894                    }
895
896                    if !options.dry_run {
897                        sqlx::query(&create_sql).execute(&self.pool).await?;
898                    }
899                }
900
901                if options.verbose {
902                    info!("✓ Schema applied for plugin '{}'", plugin.name);
903                }
904            }
905        }
906        Ok(())
907    }
908
909    // Agent operations
910    async fn get_agent(&self, agent_id: UUID) -> Result<Option<Agent>> {
911        let row = sqlx::query(
912            "SELECT id, name, character, created_at, updated_at FROM agents WHERE id = $1",
913        )
914        .bind(agent_id)
915        .fetch_optional(&self.pool)
916        .await?;
917
918        match row {
919            Some(row) => Ok(Some(Agent {
920                id: row.get("id"),
921                name: row.get("name"),
922                character: row.get("character"),
923                created_at: row.get("created_at"),
924                updated_at: row.get("updated_at"),
925            })),
926            None => Ok(None),
927        }
928    }
929
930    async fn get_agents(&self) -> Result<Vec<Agent>> {
931        let rows = sqlx::query("SELECT id, name, character, created_at, updated_at FROM agents")
932            .fetch_all(&self.pool)
933            .await?;
934
935        Ok(rows
936            .into_iter()
937            .map(|row| Agent {
938                id: row.get("id"),
939                name: row.get("name"),
940                character: row.get("character"),
941                created_at: row.get("created_at"),
942                updated_at: row.get("updated_at"),
943            })
944            .collect())
945    }
946
947    async fn create_agent(&self, agent: &Agent) -> Result<bool> {
948        let result = sqlx::query(
949            "INSERT INTO agents (id, name, character, created_at, updated_at) 
950             VALUES ($1, $2, $3, $4, $5)",
951        )
952        .bind(&agent.id)
953        .bind(&agent.name)
954        .bind(&agent.character)
955        .bind(&agent.created_at)
956        .bind(&agent.updated_at)
957        .execute(&self.pool)
958        .await?;
959
960        Ok(result.rows_affected() > 0)
961    }
962
963    async fn update_agent(&self, agent_id: UUID, agent: &Agent) -> Result<bool> {
964        let result = sqlx::query(
965            "UPDATE agents SET name = $2, character = $3, updated_at = $4 WHERE id = $1",
966        )
967        .bind(agent_id)
968        .bind(&agent.name)
969        .bind(&agent.character)
970        .bind(chrono::Utc::now().timestamp())
971        .execute(&self.pool)
972        .await?;
973
974        Ok(result.rows_affected() > 0)
975    }
976
977    async fn delete_agent(&self, agent_id: UUID) -> Result<bool> {
978        let result = sqlx::query("DELETE FROM agents WHERE id = $1")
979            .bind(agent_id)
980            .execute(&self.pool)
981            .await?;
982
983        Ok(result.rows_affected() > 0)
984    }
985
986    async fn ensure_embedding_dimension(&self, dimension: usize) -> Result<()> {
987        // Enable pgvector extension if not already enabled
988        sqlx::query("CREATE EXTENSION IF NOT EXISTS vector")
989            .execute(&self.pool)
990            .await?;
991
992        *self.embedding_dimension.write().unwrap() = dimension;
993        Ok(())
994    }
995
996    async fn get_entities_by_ids(&self, entity_ids: Vec<UUID>) -> Result<Vec<Entity>> {
997        if entity_ids.is_empty() {
998            return Ok(vec![]);
999        }
1000        let rows = sqlx::query(
1001            "SELECT id, agent_id, name, username, email, avatar_url, metadata, created_at FROM entities WHERE id = ANY($1)"
1002        )
1003        .bind(entity_ids)
1004        .fetch_all(&self.pool)
1005        .await?;
1006
1007        Ok(rows
1008            .into_iter()
1009            .map(|row| Entity {
1010                id: row.get("id"),
1011                agent_id: row.get("agent_id"),
1012                name: row.get("name"),
1013                username: row.get("username"),
1014                email: row.get("email"),
1015                avatar_url: row.get("avatar_url"),
1016                metadata: row
1017                    .try_get("metadata")
1018                    .ok()
1019                    .and_then(|v: serde_json::Value| serde_json::from_value(v).ok())
1020                    .unwrap_or_default(),
1021                created_at: row.get("created_at"),
1022            })
1023            .collect())
1024    }
1025
1026    async fn get_entities_for_room(
1027        &self,
1028        room_id: UUID,
1029        _include_components: bool,
1030    ) -> Result<Vec<Entity>> {
1031        let rows = sqlx::query(
1032            "SELECT e.id, e.agent_id, e.name, e.username, e.email, e.avatar_url, e.metadata, e.created_at
1033             FROM participants p JOIN entities e ON e.id = p.entity_id WHERE p.room_id = $1"
1034        )
1035        .bind(room_id)
1036        .fetch_all(&self.pool)
1037        .await?;
1038
1039        Ok(rows
1040            .into_iter()
1041            .map(|row| Entity {
1042                id: row.get("id"),
1043                agent_id: row.get("agent_id"),
1044                name: row.get("name"),
1045                username: row.get("username"),
1046                email: row.get("email"),
1047                avatar_url: row.get("avatar_url"),
1048                metadata: row
1049                    .try_get("metadata")
1050                    .ok()
1051                    .and_then(|v: serde_json::Value| serde_json::from_value(v).ok())
1052                    .unwrap_or_default(),
1053                created_at: row.get("created_at"),
1054            })
1055            .collect())
1056    }
1057
1058    async fn create_entities(&self, entities: Vec<Entity>) -> Result<bool> {
1059        for entity in entities {
1060            sqlx::query(
1061                "INSERT INTO entities (id, agent_id, name, username, email, avatar_url, metadata, created_at)
1062                 VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
1063                 ON CONFLICT (id) DO UPDATE SET
1064                    name = EXCLUDED.name,
1065                    username = EXCLUDED.username,
1066                    email = EXCLUDED.email,
1067                    avatar_url = EXCLUDED.avatar_url,
1068                    metadata = EXCLUDED.metadata"
1069            )
1070            .bind(entity.id)
1071            .bind(entity.agent_id)
1072            .bind(&entity.name)
1073            .bind(&entity.username)
1074            .bind(&entity.email)
1075            .bind(&entity.avatar_url)
1076            .bind(serde_json::to_value(&entity.metadata)?)
1077            .bind(entity.created_at)
1078            .execute(&self.pool)
1079            .await?;
1080        }
1081        Ok(true)
1082    }
1083
1084    async fn update_entity(&self, entity: &Entity) -> Result<()> {
1085        sqlx::query(
1086            "UPDATE entities SET name = $2, username = $3, email = $4, avatar_url = $5, metadata = $6, created_at = $7 WHERE id = $1"
1087        )
1088        .bind(entity.id)
1089        .bind(&entity.name)
1090        .bind(&entity.username)
1091        .bind(&entity.email)
1092        .bind(&entity.avatar_url)
1093        .bind(serde_json::to_value(&entity.metadata)?)
1094        .bind(entity.created_at)
1095        .execute(&self.pool)
1096        .await?;
1097        Ok(())
1098    }
1099
1100    async fn get_entity_by_id(&self, entity_id: UUID) -> Result<Option<Entity>> {
1101        let row = sqlx::query(
1102            "SELECT id, agent_id, name, username, email, avatar_url, metadata, created_at
1103             FROM entities WHERE id = $1",
1104        )
1105        .bind(entity_id)
1106        .fetch_optional(&self.pool)
1107        .await?;
1108
1109        match row {
1110            Some(row) => Ok(Some(Entity {
1111                id: row.get("id"),
1112                agent_id: row.get("agent_id"),
1113                name: row.get("name"),
1114                username: row.get("username"),
1115                email: row.get("email"),
1116                avatar_url: row.get("avatar_url"),
1117                metadata: row
1118                    .try_get("metadata")
1119                    .ok()
1120                    .and_then(|v: serde_json::Value| serde_json::from_value(v).ok())
1121                    .unwrap_or_default(),
1122                created_at: row.get("created_at"),
1123            })),
1124            None => Ok(None),
1125        }
1126    }
1127
1128    async fn get_component(
1129        &self,
1130        entity_id: UUID,
1131        component_type: &str,
1132        world_id: Option<UUID>,
1133        source_entity_id: Option<UUID>,
1134    ) -> Result<Option<Component>> {
1135        let mut query = String::from(
1136            "SELECT id, entity_id, world_id, source_entity_id, type, data, created_at, updated_at FROM components WHERE entity_id = $1 AND type = $2"
1137        );
1138        let mut idx = 3;
1139        if world_id.is_some() {
1140            query.push_str(&format!(" AND world_id = ${}", idx));
1141            idx += 1;
1142        }
1143        if source_entity_id.is_some() {
1144            query.push_str(&format!(" AND source_entity_id = ${}", idx));
1145        }
1146
1147        let mut sql = sqlx::query(&query).bind(entity_id).bind(component_type);
1148        if let Some(wid) = world_id {
1149            sql = sql.bind(wid);
1150        }
1151        if let Some(seid) = source_entity_id {
1152            sql = sql.bind(seid);
1153        }
1154
1155        let row = sql.fetch_optional(&self.pool).await?;
1156        Ok(row.map(|row| Component {
1157            id: row.get("id"),
1158            entity_id: row.get("entity_id"),
1159            world_id: row.get("world_id"),
1160            source_entity_id: row.get("source_entity_id"),
1161            component_type: row.get("type"),
1162            data: row.get("data"),
1163            created_at: row.get("created_at"),
1164            updated_at: row.get("updated_at"),
1165        }))
1166    }
1167
1168    async fn get_components(
1169        &self,
1170        entity_id: UUID,
1171        world_id: Option<UUID>,
1172        source_entity_id: Option<UUID>,
1173    ) -> Result<Vec<Component>> {
1174        let mut query = String::from(
1175            "SELECT id, entity_id, world_id, source_entity_id, type, data, created_at, updated_at FROM components WHERE entity_id = $1"
1176        );
1177        let mut idx = 2;
1178        if world_id.is_some() {
1179            query.push_str(&format!(" AND world_id = ${}", idx));
1180            idx += 1;
1181        }
1182        if source_entity_id.is_some() {
1183            query.push_str(&format!(" AND source_entity_id = ${}", idx));
1184        }
1185        let mut sql = sqlx::query(&query).bind(entity_id);
1186        if let Some(wid) = world_id {
1187            sql = sql.bind(wid);
1188        }
1189        if let Some(seid) = source_entity_id {
1190            sql = sql.bind(seid);
1191        }
1192        let rows = sql.fetch_all(&self.pool).await?;
1193        Ok(rows
1194            .into_iter()
1195            .map(|row| Component {
1196                id: row.get("id"),
1197                entity_id: row.get("entity_id"),
1198                world_id: row.get("world_id"),
1199                source_entity_id: row.get("source_entity_id"),
1200                component_type: row.get("type"),
1201                data: row.get("data"),
1202                created_at: row.get("created_at"),
1203                updated_at: row.get("updated_at"),
1204            })
1205            .collect())
1206    }
1207
1208    async fn create_component(&self, component: &Component) -> Result<bool> {
1209        let result = sqlx::query(
1210            "INSERT INTO components (id, entity_id, world_id, source_entity_id, type, data, created_at, updated_at)
1211             VALUES ($1, $2, $3, $4, $5, $6, $7, $8)"
1212        )
1213        .bind(component.id)
1214        .bind(component.entity_id)
1215        .bind(component.world_id)
1216        .bind(component.source_entity_id)
1217        .bind(&component.component_type)
1218        .bind(&component.data)
1219        .bind(component.created_at)
1220        .bind(component.updated_at)
1221        .execute(&self.pool)
1222        .await?;
1223        Ok(result.rows_affected() > 0)
1224    }
1225
1226    async fn update_component(&self, component: &Component) -> Result<()> {
1227        sqlx::query("UPDATE components SET data = $2, updated_at = $3 WHERE id = $1")
1228            .bind(component.id)
1229            .bind(&component.data)
1230            .bind(chrono::Utc::now().timestamp())
1231            .execute(&self.pool)
1232            .await?;
1233        Ok(())
1234    }
1235
1236    async fn delete_component(&self, component_id: UUID) -> Result<()> {
1237        sqlx::query("DELETE FROM components WHERE id = $1")
1238            .bind(component_id)
1239            .execute(&self.pool)
1240            .await?;
1241        Ok(())
1242    }
1243
1244    async fn get_memories(&self, params: MemoryQuery) -> Result<Vec<Memory>> {
1245        let mut query = String::from("SELECT id, entity_id, agent_id, room_id, content, embedding, metadata, created_at, unique_flag FROM memories WHERE 1=1");
1246        let mut param_index = 1;
1247
1248        if let Some(agent_id) = params.agent_id {
1249            query.push_str(&format!(" AND agent_id = ${}", param_index));
1250            param_index += 1;
1251        }
1252        if let Some(room_id) = params.room_id {
1253            query.push_str(&format!(" AND room_id = ${}", param_index));
1254            param_index += 1;
1255        }
1256        if let Some(entity_id) = params.entity_id {
1257            query.push_str(&format!(" AND entity_id = ${}", param_index));
1258            param_index += 1;
1259        }
1260        if let Some(unique) = params.unique {
1261            query.push_str(&format!(" AND unique_flag = ${}", param_index));
1262            param_index += 1;
1263        }
1264
1265        query.push_str(" ORDER BY created_at DESC");
1266
1267        if let Some(count) = params.count {
1268            query.push_str(&format!(" LIMIT ${}", param_index));
1269            param_index += 1;
1270        }
1271
1272        let mut sql_query = sqlx::query(&query);
1273        let mut bind_idx = 1;
1274        if let Some(agent_id) = params.agent_id {
1275            sql_query = sql_query.bind(agent_id);
1276            bind_idx += 1;
1277        }
1278        if let Some(room_id) = params.room_id {
1279            sql_query = sql_query.bind(room_id);
1280            bind_idx += 1;
1281        }
1282        if let Some(entity_id) = params.entity_id {
1283            sql_query = sql_query.bind(entity_id);
1284            bind_idx += 1;
1285        }
1286        if let Some(unique) = params.unique {
1287            sql_query = sql_query.bind(unique);
1288            bind_idx += 1;
1289        }
1290        if let Some(count) = params.count {
1291            sql_query = sql_query.bind(count as i64);
1292        }
1293
1294        let rows = sql_query.fetch_all(&self.pool).await?;
1295
1296        let memories = rows
1297            .into_iter()
1298            .map(|row| Memory {
1299                id: row.get("id"),
1300                entity_id: row.get("entity_id"),
1301                agent_id: row.get("agent_id"),
1302                room_id: row.get("room_id"),
1303                content: serde_json::from_value(row.get("content")).unwrap_or_default(),
1304                embedding: row.get("embedding"),
1305                metadata: row
1306                    .try_get("metadata")
1307                    .ok()
1308                    .and_then(|v: serde_json::Value| serde_json::from_value(v).ok()),
1309                created_at: row.get("created_at"),
1310                unique: Some(row.get("unique_flag")),
1311                similarity: None,
1312            })
1313            .collect();
1314
1315        Ok(memories)
1316    }
1317
1318    async fn create_memory(&self, memory: &Memory, _table_name: &str) -> Result<UUID> {
1319        sqlx::query(
1320            "INSERT INTO memories (id, entity_id, agent_id, room_id, content, embedding, metadata, created_at, unique_flag)
1321             VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)"
1322        )
1323        .bind(memory.id)
1324        .bind(memory.entity_id)
1325        .bind(memory.agent_id)
1326        .bind(memory.room_id)
1327        .bind(serde_json::to_value(&memory.content)?)
1328        .bind(&memory.embedding)
1329        .bind(serde_json::to_value(&memory.metadata)?)
1330        .bind(memory.created_at)
1331        .bind(memory.unique.unwrap_or(false))
1332        .execute(&self.pool)
1333        .await?;
1334
1335        Ok(memory.id)
1336    }
1337
1338    async fn search_memories_by_embedding(
1339        &self,
1340        params: SearchMemoriesParams,
1341    ) -> Result<Vec<Memory>> {
1342        use tracing::{info, warn};
1343
1344        // Validate embedding dimension
1345        let expected_dim = *self.embedding_dimension.read().unwrap();
1346        let actual_dim = params.embedding.len();
1347
1348        if actual_dim != expected_dim {
1349            return Err(ZoeyError::vector_search(
1350                format!(
1351                    "Embedding dimension mismatch for table '{}'",
1352                    params.table_name
1353                ),
1354                actual_dim,
1355                expected_dim,
1356            ));
1357        }
1358
1359        // Validate table name to prevent SQL injection
1360        let validated_table = validate_table_name(&params.table_name)?;
1361
1362        info!(
1363            "Searching memories by embedding in table '{}' with {} dimensions, limit: {}",
1364            validated_table, actual_dim, params.count
1365        );
1366
1367        // Build query with optional filters
1368        let mut query = format!(
1369            "SELECT id, entity_id, agent_id, room_id, content, embedding, metadata, created_at, unique_flag,
1370             embedding <-> $1::vector AS similarity
1371             FROM {}
1372             WHERE embedding IS NOT NULL",
1373            validated_table
1374        );
1375
1376        let mut param_count = 2;
1377        if params.agent_id.is_some() {
1378            query.push_str(&format!(" AND agent_id = ${}", param_count));
1379            param_count += 1;
1380        }
1381        if params.room_id.is_some() {
1382            query.push_str(&format!(" AND room_id = ${}", param_count));
1383            param_count += 1;
1384        }
1385        if params.world_id.is_some() {
1386            query.push_str(&format!(" AND world_id = ${}", param_count));
1387            param_count += 1;
1388        }
1389        if params.entity_id.is_some() {
1390            query.push_str(&format!(" AND entity_id = ${}", param_count));
1391            param_count += 1;
1392        }
1393
1394        query.push_str(" ORDER BY similarity ASC");
1395        query.push_str(&format!(" LIMIT ${}", param_count));
1396
1397        // Execute query with dynamic parameter binding
1398        let embedding_str = format!(
1399            "[{}]",
1400            params
1401                .embedding
1402                .iter()
1403                .map(|v| v.to_string())
1404                .collect::<Vec<_>>()
1405                .join(",")
1406        );
1407
1408        let mut sql_query = sqlx::query(&query).bind(&embedding_str);
1409
1410        if let Some(agent_id) = params.agent_id {
1411            sql_query = sql_query.bind(agent_id);
1412        }
1413        if let Some(room_id) = params.room_id {
1414            sql_query = sql_query.bind(room_id);
1415        }
1416        if let Some(world_id) = params.world_id {
1417            sql_query = sql_query.bind(world_id);
1418        }
1419        if let Some(entity_id) = params.entity_id {
1420            sql_query = sql_query.bind(entity_id);
1421        }
1422        sql_query = sql_query.bind(params.count as i64);
1423
1424        let rows = sql_query.fetch_all(&self.pool).await.map_err(|e| {
1425            warn!("Vector search failed: {}", e);
1426            ZoeyError::database(format!(
1427                "Vector search failed in table '{}': {}",
1428                params.table_name, e
1429            ))
1430        })?;
1431
1432        let mut memories = Vec::new();
1433        for row in rows {
1434            let id: uuid::Uuid = row.get("id");
1435            let entity_id: uuid::Uuid = row.get("entity_id");
1436            let agent_id: uuid::Uuid = row.get("agent_id");
1437            let room_id: uuid::Uuid = row.get("room_id");
1438            let content_value: serde_json::Value = row.get("content");
1439            let metadata_value: Option<serde_json::Value> = row.get("metadata");
1440            let created_at: i64 = row.get("created_at");
1441            let unique_flag: Option<bool> = row.get("unique_flag");
1442            let similarity: f32 = row.get("similarity");
1443
1444            let content: Content = serde_json::from_value(content_value)?;
1445            let metadata: Option<MemoryMetadata> = metadata_value
1446                .map(|v| serde_json::from_value(v))
1447                .transpose()?;
1448
1449            memories.push(Memory {
1450                id,
1451                entity_id,
1452                agent_id,
1453                room_id,
1454                content,
1455                embedding: None, // Don't return embeddings by default to save bandwidth
1456                metadata,
1457                created_at,
1458                unique: unique_flag,
1459                similarity: Some(similarity), // Include similarity score
1460            });
1461        }
1462
1463        info!(
1464            "Found {} memories similar to query embedding",
1465            memories.len()
1466        );
1467        Ok(memories)
1468    }
1469
1470    async fn get_cached_embeddings(&self, params: MemoryQuery) -> Result<Vec<Memory>> {
1471        use tracing::{debug, info};
1472
1473        debug!(
1474            "Getting cached embeddings for agent_id: {:?}, room_id: {:?}, count: {:?}",
1475            params.agent_id, params.room_id, params.count
1476        );
1477
1478        // Validate table name to prevent SQL injection
1479        let validated_table = validate_table_name(&params.table_name)?;
1480
1481        // Build query for memories with embeddings
1482        let mut query = format!(
1483            "SELECT id, entity_id, agent_id, room_id, content, embedding, metadata, created_at, unique_flag
1484             FROM {}
1485             WHERE embedding IS NOT NULL",
1486            validated_table
1487        );
1488
1489        let mut param_count = 1;
1490        if params.agent_id.is_some() {
1491            query.push_str(&format!(" AND agent_id = ${}", param_count));
1492            param_count += 1;
1493        }
1494        if params.room_id.is_some() {
1495            query.push_str(&format!(" AND room_id = ${}", param_count));
1496            param_count += 1;
1497        }
1498        if params.entity_id.is_some() {
1499            query.push_str(&format!(" AND entity_id = ${}", param_count));
1500            param_count += 1;
1501        }
1502        if params.unique.is_some() {
1503            query.push_str(&format!(" AND unique_flag = ${}", param_count));
1504            param_count += 1;
1505        }
1506
1507        query.push_str(" ORDER BY created_at DESC");
1508
1509        if let Some(limit) = params.count {
1510            query.push_str(&format!(" LIMIT ${}", param_count));
1511        }
1512
1513        // Execute query
1514        let mut sql_query = sqlx::query(&query);
1515
1516        if let Some(agent_id) = params.agent_id {
1517            sql_query = sql_query.bind(agent_id);
1518        }
1519        if let Some(room_id) = params.room_id {
1520            sql_query = sql_query.bind(room_id);
1521        }
1522        if let Some(entity_id) = params.entity_id {
1523            sql_query = sql_query.bind(entity_id);
1524        }
1525        if let Some(unique) = params.unique {
1526            sql_query = sql_query.bind(unique);
1527        }
1528        if let Some(limit) = params.count {
1529            sql_query = sql_query.bind(limit as i64);
1530        }
1531
1532        let rows = sql_query.fetch_all(&self.pool).await?;
1533
1534        let mut memories = Vec::new();
1535        for row in rows {
1536            let id: uuid::Uuid = row.get("id");
1537            let entity_id: uuid::Uuid = row.get("entity_id");
1538            let agent_id: uuid::Uuid = row.get("agent_id");
1539            let room_id: uuid::Uuid = row.get("room_id");
1540            let content_value: serde_json::Value = row.get("content");
1541            let embedding_str: Option<String> = row.get("embedding");
1542            let metadata_value: Option<serde_json::Value> = row.get("metadata");
1543            let created_at: i64 = row.get("created_at");
1544            let unique_flag: Option<bool> = row.get("unique_flag");
1545
1546            let content: Content = serde_json::from_value(content_value)?;
1547
1548            // Parse embedding if present
1549            let embedding = if let Some(emb_str) = embedding_str {
1550                // pgvector stores as "[1.0,2.0,3.0]" format
1551                let trimmed = emb_str.trim_matches(|c| c == '[' || c == ']');
1552                let vec: std::result::Result<Vec<f32>, _> = trimmed
1553                    .split(',')
1554                    .map(|s| s.trim().parse::<f32>())
1555                    .collect();
1556                Some(vec.map_err(|e| {
1557                    ZoeyError::database(format!("Failed to parse embedding: {}", e))
1558                })?)
1559            } else {
1560                None
1561            };
1562
1563            let metadata: Option<MemoryMetadata> = metadata_value
1564                .map(|v| serde_json::from_value(v))
1565                .transpose()?;
1566
1567            memories.push(Memory {
1568                id,
1569                entity_id,
1570                agent_id,
1571                room_id,
1572                content,
1573                embedding,
1574                metadata,
1575                created_at,
1576                unique: unique_flag,
1577                similarity: None,
1578            });
1579        }
1580
1581        info!(
1582            "Retrieved {} cached embeddings from {}",
1583            memories.len(),
1584            params.table_name
1585        );
1586        Ok(memories)
1587    }
1588
1589    async fn update_memory(&self, memory: &Memory) -> Result<bool> {
1590        let result = sqlx::query(
1591            "UPDATE memories SET content = $2, embedding = $3, metadata = $4 WHERE id = $1",
1592        )
1593        .bind(memory.id)
1594        .bind(serde_json::to_value(&memory.content)?)
1595        .bind(&memory.embedding)
1596        .bind(serde_json::to_value(&memory.metadata)?)
1597        .execute(&self.pool)
1598        .await?;
1599
1600        Ok(result.rows_affected() > 0)
1601    }
1602
1603    async fn remove_memory(&self, memory_id: UUID, _table_name: &str) -> Result<bool> {
1604        let result = sqlx::query("DELETE FROM memories WHERE id = $1")
1605            .bind(memory_id)
1606            .execute(&self.pool)
1607            .await?;
1608
1609        Ok(result.rows_affected() > 0)
1610    }
1611
1612    async fn remove_all_memories(&self, agent_id: UUID, _table_name: &str) -> Result<bool> {
1613        let result = sqlx::query("DELETE FROM memories WHERE agent_id = $1")
1614            .bind(agent_id)
1615            .execute(&self.pool)
1616            .await?;
1617
1618        Ok(result.rows_affected() > 0)
1619    }
1620
1621    async fn count_memories(&self, params: MemoryQuery) -> Result<usize> {
1622        let mut query = String::from("SELECT COUNT(*) as count FROM memories WHERE 1=1");
1623        let mut bind_idx = 1;
1624        let mut query_args = sqlx::postgres::PgArguments::default();
1625
1626        if let Some(agent_id) = params.agent_id {
1627            query.push_str(&format!(" AND agent_id = ${}", bind_idx));
1628            query_args
1629                .add(agent_id)
1630                .map_err(|e| ZoeyError::Database(e.to_string()))?;
1631            bind_idx += 1;
1632        }
1633        if let Some(room_id) = params.room_id {
1634            query.push_str(&format!(" AND room_id = ${}", bind_idx));
1635            query_args
1636                .add(room_id)
1637                .map_err(|e| ZoeyError::Database(e.to_string()))?;
1638            bind_idx += 1;
1639        }
1640        if let Some(entity_id) = params.entity_id {
1641            query.push_str(&format!(" AND entity_id = ${}", bind_idx));
1642            query_args
1643                .add(entity_id)
1644                .map_err(|e| ZoeyError::Database(e.to_string()))?;
1645            bind_idx += 1;
1646        }
1647        if let Some(unique) = params.unique {
1648            query.push_str(&format!(" AND unique_flag = ${}", bind_idx));
1649            query_args
1650                .add(unique)
1651                .map_err(|e| ZoeyError::Database(e.to_string()))?;
1652            bind_idx += 1;
1653        }
1654
1655        let row = sqlx::query_with(&query, query_args)
1656            .fetch_one(&self.pool)
1657            .await?;
1658
1659        let count: i64 = row.get("count");
1660        Ok(count as usize)
1661    }
1662
1663    async fn get_world(&self, world_id: UUID) -> Result<Option<World>> {
1664        let row = sqlx::query(
1665            "SELECT id, name, agent_id, server_id, metadata, created_at FROM worlds WHERE id = $1",
1666        )
1667        .bind(world_id)
1668        .fetch_optional(&self.pool)
1669        .await?;
1670        Ok(row.map(|row| World {
1671            id: row.get("id"),
1672            name: row.get("name"),
1673            agent_id: row.get("agent_id"),
1674            server_id: row.get("server_id"),
1675            metadata: row
1676                .try_get("metadata")
1677                .ok()
1678                .and_then(|v: serde_json::Value| serde_json::from_value(v).ok())
1679                .unwrap_or_default(),
1680            created_at: row.get("created_at"),
1681        }))
1682    }
1683
1684    async fn ensure_world(&self, world: &World) -> Result<()> {
1685        sqlx::query(
1686            "INSERT INTO worlds (id, name, agent_id, server_id, metadata, created_at)
1687             VALUES ($1, $2, $3, $4, $5, $6)
1688             ON CONFLICT (id) DO UPDATE SET name = EXCLUDED.name, server_id = EXCLUDED.server_id, metadata = EXCLUDED.metadata"
1689        )
1690        .bind(world.id)
1691        .bind(&world.name)
1692        .bind(world.agent_id)
1693        .bind(&world.server_id)
1694        .bind(serde_json::to_value(&world.metadata)?)
1695        .bind(world.created_at.unwrap_or_else(|| chrono::Utc::now().timestamp()))
1696        .execute(&self.pool)
1697        .await?;
1698        Ok(())
1699    }
1700
1701    async fn get_room(&self, room_id: UUID) -> Result<Option<Room>> {
1702        let row = sqlx::query(
1703            "SELECT id, agent_id, name, source, type, channel_id, server_id, world_id, metadata, created_at FROM rooms WHERE id = $1"
1704        )
1705        .bind(room_id)
1706        .fetch_optional(&self.pool)
1707        .await?;
1708        Ok(row.map(|row| Room {
1709            id: row.get("id"),
1710            agent_id: row.get("agent_id"),
1711            name: row.get("name"),
1712            source: row.get("source"),
1713            channel_type: {
1714                let t: String = row.get("type");
1715                match t.as_str() {
1716                    "DM" => ChannelType::Dm,
1717                    "VOICE_DM" => ChannelType::VoiceDm,
1718                    "GROUP_DM" => ChannelType::GroupDm,
1719                    "GUILD_TEXT" => ChannelType::GuildText,
1720                    "GUILD_VOICE" => ChannelType::GuildVoice,
1721                    "THREAD" => ChannelType::Thread,
1722                    "FEED" => ChannelType::Feed,
1723                    "SELF" => ChannelType::SelfChannel,
1724                    "API" => ChannelType::Api,
1725                    "WORLD" => ChannelType::World,
1726                    _ => ChannelType::Unknown,
1727                }
1728            },
1729            channel_id: row.get("channel_id"),
1730            server_id: row.get("server_id"),
1731            world_id: row.get("world_id"),
1732            metadata: row
1733                .try_get("metadata")
1734                .ok()
1735                .and_then(|v: serde_json::Value| serde_json::from_value(v).ok())
1736                .unwrap_or_default(),
1737            created_at: row.get("created_at"),
1738        }))
1739    }
1740
1741    async fn create_room(&self, room: &Room) -> Result<UUID> {
1742        sqlx::query(
1743            "INSERT INTO rooms (id, agent_id, name, source, type, channel_id, server_id, world_id, metadata, created_at)
1744             VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)"
1745        )
1746        .bind(room.id)
1747        .bind(room.agent_id)
1748        .bind(&room.name)
1749        .bind(&room.source)
1750        .bind(match room.channel_type {
1751            ChannelType::Dm => "DM",
1752            ChannelType::VoiceDm => "VOICE_DM",
1753            ChannelType::GroupDm => "GROUP_DM",
1754            ChannelType::GuildText => "GUILD_TEXT",
1755            ChannelType::GuildVoice => "GUILD_VOICE",
1756            ChannelType::Thread => "THREAD",
1757            ChannelType::Feed => "FEED",
1758            ChannelType::SelfChannel => "SELF",
1759            ChannelType::Api => "API",
1760            ChannelType::World => "WORLD",
1761            ChannelType::Unknown => "UNKNOWN",
1762        })
1763        .bind(&room.channel_id)
1764        .bind(&room.server_id)
1765        .bind(room.world_id)
1766        .bind(serde_json::to_value(&room.metadata)?)
1767        .bind(room.created_at.unwrap_or_else(|| chrono::Utc::now().timestamp()))
1768        .execute(&self.pool)
1769        .await?;
1770        Ok(room.id)
1771    }
1772
1773    async fn get_rooms(&self, world_id: UUID) -> Result<Vec<Room>> {
1774        let rows = sqlx::query(
1775            "SELECT id, agent_id, name, source, type, channel_id, server_id, world_id, metadata, created_at FROM rooms WHERE world_id = $1"
1776        )
1777        .bind(world_id)
1778        .fetch_all(&self.pool)
1779        .await?;
1780        Ok(rows
1781            .into_iter()
1782            .map(|row| Room {
1783                id: row.get("id"),
1784                agent_id: row.get("agent_id"),
1785                name: row.get("name"),
1786                source: row.get("source"),
1787                channel_type: {
1788                    let t: String = row.get("type");
1789                    match t.as_str() {
1790                        "DM" => ChannelType::Dm,
1791                        "VOICE_DM" => ChannelType::VoiceDm,
1792                        "GROUP_DM" => ChannelType::GroupDm,
1793                        "GUILD_TEXT" => ChannelType::GuildText,
1794                        "GUILD_VOICE" => ChannelType::GuildVoice,
1795                        "THREAD" => ChannelType::Thread,
1796                        "FEED" => ChannelType::Feed,
1797                        "SELF" => ChannelType::SelfChannel,
1798                        "API" => ChannelType::Api,
1799                        "WORLD" => ChannelType::World,
1800                        _ => ChannelType::Unknown,
1801                    }
1802                },
1803                channel_id: row.get("channel_id"),
1804                server_id: row.get("server_id"),
1805                world_id: row.get("world_id"),
1806                metadata: row
1807                    .try_get("metadata")
1808                    .ok()
1809                    .and_then(|v: serde_json::Value| serde_json::from_value(v).ok())
1810                    .unwrap_or_default(),
1811                created_at: row.get("created_at"),
1812            })
1813            .collect())
1814    }
1815
1816    async fn get_rooms_for_agent(&self, agent_id: UUID) -> Result<Vec<Room>> {
1817        let rows = sqlx::query(
1818            "SELECT id, agent_id, name, source, type, channel_id, server_id, world_id, metadata, created_at FROM rooms WHERE agent_id = $1"
1819        )
1820        .bind(agent_id)
1821        .fetch_all(&self.pool)
1822        .await?;
1823        Ok(rows
1824            .into_iter()
1825            .map(|row| Room {
1826                id: row.get("id"),
1827                agent_id: row.get("agent_id"),
1828                name: row.get("name"),
1829                source: row.get("source"),
1830                channel_type: {
1831                    let t: String = row.get("type");
1832                    match t.as_str() {
1833                        "DM" => ChannelType::Dm,
1834                        "VOICE_DM" => ChannelType::VoiceDm,
1835                        "GROUP_DM" => ChannelType::GroupDm,
1836                        "GUILD_TEXT" => ChannelType::GuildText,
1837                        "GUILD_VOICE" => ChannelType::GuildVoice,
1838                        "THREAD" => ChannelType::Thread,
1839                        "FEED" => ChannelType::Feed,
1840                        "SELF" => ChannelType::SelfChannel,
1841                        "API" => ChannelType::Api,
1842                        "WORLD" => ChannelType::World,
1843                        _ => ChannelType::Unknown,
1844                    }
1845                },
1846                channel_id: row.get("channel_id"),
1847                server_id: row.get("server_id"),
1848                world_id: row.get("world_id"),
1849                metadata: row
1850                    .try_get("metadata")
1851                    .ok()
1852                    .and_then(|v: serde_json::Value| serde_json::from_value(v).ok())
1853                    .unwrap_or_default(),
1854                created_at: row.get("created_at"),
1855            })
1856            .collect())
1857    }
1858
1859    async fn add_participant(&self, entity_id: UUID, room_id: UUID) -> Result<bool> {
1860        let result = sqlx::query(
1861            "INSERT INTO participants (entity_id, room_id, joined_at, metadata) VALUES ($1, $2, $3, $4)"
1862        )
1863        .bind(entity_id)
1864        .bind(room_id)
1865        .bind(chrono::Utc::now().timestamp())
1866        .bind(serde_json::json!({}))
1867        .execute(&self.pool)
1868        .await?;
1869        Ok(result.rows_affected() > 0)
1870    }
1871
1872    async fn remove_participant(&self, entity_id: UUID, room_id: UUID) -> Result<bool> {
1873        let result = sqlx::query("DELETE FROM participants WHERE entity_id = $1 AND room_id = $2")
1874            .bind(entity_id)
1875            .bind(room_id)
1876            .execute(&self.pool)
1877            .await?;
1878        Ok(result.rows_affected() > 0)
1879    }
1880
1881    async fn get_participants(&self, room_id: UUID) -> Result<Vec<Participant>> {
1882        let rows = sqlx::query(
1883            "SELECT entity_id, room_id, joined_at, metadata FROM participants WHERE room_id = $1",
1884        )
1885        .bind(room_id)
1886        .fetch_all(&self.pool)
1887        .await?;
1888        Ok(rows
1889            .into_iter()
1890            .map(|row| Participant {
1891                entity_id: row.get("entity_id"),
1892                room_id: row.get("room_id"),
1893                joined_at: row.get("joined_at"),
1894                metadata: row
1895                    .try_get("metadata")
1896                    .ok()
1897                    .and_then(|v: serde_json::Value| serde_json::from_value(v).ok())
1898                    .unwrap_or_default(),
1899            })
1900            .collect())
1901    }
1902
1903    async fn create_relationship(&self, relationship: &Relationship) -> Result<bool> {
1904        let result = sqlx::query(
1905            "INSERT INTO relationships (entity_id_a, entity_id_b, type, agent_id, metadata, created_at)
1906             VALUES ($1, $2, $3, $4, $5, $6)"
1907        )
1908        .bind(relationship.entity_id_a)
1909        .bind(relationship.entity_id_b)
1910        .bind(&relationship.relationship_type)
1911        .bind(relationship.agent_id)
1912        .bind(serde_json::to_value(&relationship.metadata)?)
1913        .bind(relationship.created_at.unwrap_or_else(|| chrono::Utc::now().timestamp()))
1914        .execute(&self.pool)
1915        .await?;
1916        Ok(result.rows_affected() > 0)
1917    }
1918
1919    async fn get_relationship(
1920        &self,
1921        entity_id_a: UUID,
1922        entity_id_b: UUID,
1923    ) -> Result<Option<Relationship>> {
1924        let row = sqlx::query(
1925            "SELECT entity_id_a, entity_id_b, type, agent_id, metadata, created_at
1926             FROM relationships WHERE (entity_id_a = $1 AND entity_id_b = $2) OR (entity_id_a = $2 AND entity_id_b = $1)"
1927        )
1928        .bind(entity_id_a)
1929        .bind(entity_id_b)
1930        .fetch_optional(&self.pool)
1931        .await?;
1932        Ok(row.map(|row| Relationship {
1933            entity_id_a: row.get("entity_id_a"),
1934            entity_id_b: row.get("entity_id_b"),
1935            relationship_type: row.get("type"),
1936            agent_id: row.get("agent_id"),
1937            metadata: row
1938                .try_get("metadata")
1939                .ok()
1940                .and_then(|v: serde_json::Value| serde_json::from_value(v).ok())
1941                .unwrap_or_default(),
1942            created_at: row.get("created_at"),
1943        }))
1944    }
1945
1946    async fn create_task(&self, task: &Task) -> Result<UUID> {
1947        let status_str = match task.status {
1948            TaskStatus::Pending => "PENDING",
1949            TaskStatus::Running => "RUNNING",
1950            TaskStatus::Completed => "COMPLETED",
1951            TaskStatus::Failed => "FAILED",
1952            TaskStatus::Cancelled => "CANCELLED",
1953        };
1954        sqlx::query(
1955            "INSERT INTO tasks (id, agent_id, task_type, data, status, priority, scheduled_at, executed_at, retry_count, max_retries, error, created_at, updated_at)
1956             VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13)"
1957        )
1958        .bind(task.id)
1959        .bind(task.agent_id)
1960        .bind(&task.task_type)
1961        .bind(&task.data)
1962        .bind(status_str)
1963        .bind(task.priority)
1964        .bind(task.scheduled_at)
1965        .bind(task.executed_at)
1966        .bind(task.retry_count)
1967        .bind(task.max_retries)
1968        .bind(&task.error)
1969        .bind(task.created_at)
1970        .bind(task.updated_at)
1971        .execute(&self.pool)
1972        .await?;
1973        Ok(task.id)
1974    }
1975
1976    async fn update_task(&self, task: &Task) -> Result<bool> {
1977        let status_str = match task.status {
1978            TaskStatus::Pending => "PENDING",
1979            TaskStatus::Running => "RUNNING",
1980            TaskStatus::Completed => "COMPLETED",
1981            TaskStatus::Failed => "FAILED",
1982            TaskStatus::Cancelled => "CANCELLED",
1983        };
1984        let result = sqlx::query(
1985            "UPDATE tasks SET data = $2, status = $3, priority = $4, scheduled_at = $5, executed_at = $6, retry_count = $7, max_retries = $8, error = $9, updated_at = $10 WHERE id = $1"
1986        )
1987        .bind(task.id)
1988        .bind(&task.data)
1989        .bind(status_str)
1990        .bind(task.priority)
1991        .bind(task.scheduled_at)
1992        .bind(task.executed_at)
1993        .bind(task.retry_count)
1994        .bind(task.max_retries)
1995        .bind(&task.error)
1996        .bind(task.updated_at.unwrap_or_else(|| chrono::Utc::now().timestamp()))
1997        .execute(&self.pool)
1998        .await?;
1999        Ok(result.rows_affected() > 0)
2000    }
2001
2002    async fn get_task(&self, task_id: UUID) -> Result<Option<Task>> {
2003        let row = sqlx::query(
2004            "SELECT id, agent_id, task_type, data, status, priority, scheduled_at, executed_at, retry_count, max_retries, error, created_at, updated_at FROM tasks WHERE id = $1"
2005        )
2006        .bind(task_id)
2007        .fetch_optional(&self.pool)
2008        .await?;
2009        Ok(row.map(|row| {
2010            let status_str: String = row.get("status");
2011            let status = match status_str.as_str() {
2012                "PENDING" => TaskStatus::Pending,
2013                "RUNNING" => TaskStatus::Running,
2014                "COMPLETED" => TaskStatus::Completed,
2015                "FAILED" => TaskStatus::Failed,
2016                "CANCELLED" => TaskStatus::Cancelled,
2017                _ => TaskStatus::Pending,
2018            };
2019            Task {
2020                id: row.get("id"),
2021                agent_id: row.get("agent_id"),
2022                task_type: row.get("task_type"),
2023                data: row.get("data"),
2024                status,
2025                priority: row.get("priority"),
2026                scheduled_at: row.get("scheduled_at"),
2027                executed_at: row.get("executed_at"),
2028                retry_count: row.get("retry_count"),
2029                max_retries: row.get("max_retries"),
2030                error: row.get("error"),
2031                created_at: row.get("created_at"),
2032                updated_at: row.get("updated_at"),
2033            }
2034        }))
2035    }
2036
2037    async fn get_pending_tasks(&self, agent_id: UUID) -> Result<Vec<Task>> {
2038        let rows = sqlx::query(
2039            "SELECT id, agent_id, task_type, data, status, priority, scheduled_at, executed_at, retry_count, max_retries, error, created_at, updated_at
2040             FROM tasks WHERE agent_id = $1 AND status = 'PENDING' ORDER BY scheduled_at NULLS LAST, created_at ASC"
2041        )
2042        .bind(agent_id)
2043        .fetch_all(&self.pool)
2044        .await?;
2045        Ok(rows
2046            .into_iter()
2047            .map(|row| Task {
2048                id: row.get("id"),
2049                agent_id: row.get("agent_id"),
2050                task_type: row.get("task_type"),
2051                data: row.get("data"),
2052                status: TaskStatus::Pending,
2053                priority: row.get("priority"),
2054                scheduled_at: row.get("scheduled_at"),
2055                executed_at: row.get("executed_at"),
2056                retry_count: row.get("retry_count"),
2057                max_retries: row.get("max_retries"),
2058                error: row.get("error"),
2059                created_at: row.get("created_at"),
2060                updated_at: row.get("updated_at"),
2061            })
2062            .collect())
2063    }
2064
2065    async fn log(&self, log: &Log) -> Result<()> {
2066        sqlx::query(
2067            "INSERT INTO logs (entity_id, room_id, body, type, created_at)
2068             VALUES ($1, $2, $3, $4, $5)",
2069        )
2070        .bind(log.entity_id)
2071        .bind(log.room_id)
2072        .bind(&log.body)
2073        .bind(&log.log_type)
2074        .bind(log.created_at)
2075        .execute(&self.pool)
2076        .await?;
2077
2078        Ok(())
2079    }
2080
2081    async fn get_logs(&self, params: LogQuery) -> Result<Vec<Log>> {
2082        let mut query = String::from(
2083            "SELECT id, entity_id, room_id, body, type, created_at FROM logs WHERE 1=1",
2084        );
2085        let mut idx = 1;
2086        if params.entity_id.is_some() {
2087            query.push_str(&format!(" AND entity_id = ${}", idx));
2088            idx += 1;
2089        }
2090        if params.room_id.is_some() {
2091            query.push_str(&format!(" AND room_id = ${}", idx));
2092            idx += 1;
2093        }
2094        if params.log_type.is_some() {
2095            query.push_str(&format!(" AND type = ${}", idx));
2096            idx += 1;
2097        }
2098        query.push_str(" ORDER BY created_at DESC");
2099        if params.limit.is_some() {
2100            query.push_str(&format!(" LIMIT ${}", idx));
2101            idx += 1;
2102        }
2103        if params.offset.is_some() {
2104            query.push_str(&format!(" OFFSET ${}", idx));
2105        }
2106        let mut sql = sqlx::query(&query);
2107        if let Some(eid) = params.entity_id {
2108            sql = sql.bind(eid);
2109        }
2110        if let Some(rid) = params.room_id {
2111            sql = sql.bind(rid);
2112        }
2113        if let Some(t) = params.log_type {
2114            sql = sql.bind(t);
2115        }
2116        if let Some(l) = params.limit {
2117            sql = sql.bind(l as i64);
2118        }
2119        if let Some(o) = params.offset {
2120            sql = sql.bind(o as i64);
2121        }
2122        let rows = sql.fetch_all(&self.pool).await?;
2123        Ok(rows
2124            .into_iter()
2125            .map(|row| Log {
2126                id: Some(row.get("id")),
2127                entity_id: row.get("entity_id"),
2128                room_id: row.get("room_id"),
2129                body: row.get("body"),
2130                log_type: row.get("type"),
2131                created_at: row.get("created_at"),
2132            })
2133            .collect())
2134    }
2135
2136    async fn get_agent_run_summaries(
2137        &self,
2138        params: RunSummaryQuery,
2139    ) -> Result<AgentRunSummaryResult> {
2140        let mut base = String::from("FROM llm_costs WHERE 1=1");
2141        let mut idx = 1;
2142        if params.agent_id.is_some() {
2143            base.push_str(&format!(" AND agent_id = ${}", idx));
2144            idx += 1;
2145        }
2146        if params.status.is_some() {
2147            base.push_str(&format!(" AND success = ${}", idx));
2148            idx += 1;
2149        }
2150        let count_sql = format!("SELECT COUNT(*) as count {}", base);
2151        let mut count_q = sqlx::query(&count_sql);
2152        if let Some(aid) = params.agent_id {
2153            count_q = count_q.bind(aid);
2154        }
2155        if let Some(st) = params.status {
2156            count_q = count_q.bind(matches!(st, RunStatus::Completed));
2157        }
2158        let count_row = count_q.fetch_one(&self.pool).await?;
2159        let total: i64 = count_row.get("count");
2160
2161        let mut query = format!(
2162            "SELECT id, timestamp, success, conversation_id {} ORDER BY timestamp DESC",
2163            base
2164        );
2165        if params.limit.is_some() {
2166            query.push_str(&format!(" LIMIT ${}", idx));
2167            idx += 1;
2168        }
2169        if params.offset.is_some() {
2170            query.push_str(&format!(" OFFSET ${}", idx));
2171        }
2172        let mut sql = sqlx::query(&query);
2173        if let Some(aid) = params.agent_id {
2174            sql = sql.bind(aid);
2175        }
2176        if let Some(st) = params.status {
2177            sql = sql.bind(matches!(st, RunStatus::Completed));
2178        }
2179        if let Some(l) = params.limit {
2180            sql = sql.bind(l as i64);
2181        }
2182        if let Some(o) = params.offset {
2183            sql = sql.bind(o as i64);
2184        }
2185        let rows = sql.fetch_all(&self.pool).await?;
2186        let mut runs = Vec::new();
2187        for row in rows {
2188            let id: uuid::Uuid = row.get("id");
2189            let ts: chrono::DateTime<chrono::Utc> = row.get("timestamp");
2190            let success: bool = row.get("success");
2191            let status = if success {
2192                RunStatus::Completed
2193            } else {
2194                RunStatus::Error
2195            };
2196            runs.push(AgentRunSummary {
2197                run_id: id.to_string(),
2198                status,
2199                started_at: Some(ts.timestamp()),
2200                ended_at: Some(ts.timestamp()),
2201                duration_ms: None,
2202                message_id: None,
2203                room_id: None,
2204                entity_id: None,
2205                metadata: None,
2206                counts: None,
2207            });
2208        }
2209        Ok(AgentRunSummaryResult {
2210            runs,
2211            total: total as usize,
2212            has_more: params
2213                .limit
2214                .map(|l| (l as i64) + (params.offset.unwrap_or(0) as i64) < total)
2215                .unwrap_or(false),
2216        })
2217    }
2218}
2219
2220impl PostgresAdapter {
2221    pub async fn persist_llm_cost(&self, record: LLMCostRecord) -> Result<()> {
2222        sqlx::query(
2223            "INSERT INTO llm_costs (id, timestamp, agent_id, user_id, conversation_id, action_name, evaluator_name, provider, model, temperature, prompt_tokens, completion_tokens, total_tokens, cached_tokens, input_cost_usd, output_cost_usd, total_cost_usd, latency_ms, ttft_ms, success, error, prompt_hash, prompt_preview)
2224             VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17, $18, $19, $20, $21, $22, $23)"
2225        )
2226        .bind(record.id)
2227        .bind(record.timestamp)
2228        .bind(record.agent_id)
2229        .bind(record.user_id)
2230        .bind(record.conversation_id)
2231        .bind(record.action_name)
2232        .bind(record.evaluator_name)
2233        .bind(record.provider)
2234        .bind(record.model)
2235        .bind(record.temperature)
2236        .bind(record.prompt_tokens as i64)
2237        .bind(record.completion_tokens as i64)
2238        .bind(record.total_tokens as i64)
2239        .bind(record.cached_tokens.map(|v| v as i64))
2240        .bind(record.input_cost_usd)
2241        .bind(record.output_cost_usd)
2242        .bind(record.total_cost_usd)
2243        .bind(record.latency_ms as i64)
2244        .bind(record.ttft_ms.map(|v| v as i64))
2245        .bind(record.success)
2246        .bind(record.error)
2247        .bind(record.prompt_hash)
2248        .bind(record.prompt_preview)
2249        .execute(&self.pool)
2250        .await?;
2251        Ok(())
2252    }
2253}
2254
2255#[cfg(test)]
2256mod tests {
2257    use super::*;
2258
2259    #[test]
2260    fn test_postgres_adapter_creation() {
2261        assert!(true);
2262    }
2263}