zoey_storage_sql/
sqlite.rs

1//! SQLite database adapter
2//!
3//! Implements core database operations for SQLite.
4//! Note: Vector search is not supported - for embedding-based search, use PostgreSQL with pgvector.
5
6use async_trait::async_trait;
7use zoey_core::observability::types::LLMCostRecord;
8use zoey_core::{types::*, ZoeyError, Result};
9use sqlx::sqlite::SqliteConnectOptions;
10use sqlx::{Row, SqlitePool};
11use std::str::FromStr;
12use tracing::{debug, info};
13
14/// Helper macro for not implemented methods (vector search only)
15macro_rules! not_implemented {
16    ($method:expr) => {
17        Err(ZoeyError::database(format!(
18            "SQLite: {} not implemented. Use PostgreSQL for full functionality",
19            $method
20        )))
21    };
22}
23
24/// SQLite database adapter
25pub struct SqliteAdapter {
26    pool: SqlitePool,
27}
28
29impl SqliteAdapter {
30    /// Create a new SQLite adapter
31    pub async fn new(database_path: &str) -> Result<Self> {
32        info!("Opening SQLite database at: {}", database_path);
33
34        let opts = SqliteConnectOptions::from_str(database_path)
35            .map_err(|e| ZoeyError::database(format!("Invalid SQLite URL: {}", e)))?
36            .create_if_missing(true);
37
38        let pool = SqlitePool::connect_with(opts)
39            .await
40            .map_err(|e| ZoeyError::DatabaseSqlx(e))?;
41
42        Ok(Self { pool })
43    }
44
45    /// Initialize database schema
46    async fn init_schema(&self) -> Result<()> {
47        debug!("Initializing SQLite schema...");
48
49        // Enable foreign keys
50        sqlx::query("PRAGMA foreign_keys = ON")
51            .execute(&self.pool)
52            .await?;
53
54        // ============================================================
55        // CREATE TABLES
56        // ============================================================
57
58        // Agents table (root entity)
59        sqlx::query(
60            r#"
61            CREATE TABLE IF NOT EXISTS agents (
62                id TEXT PRIMARY KEY,
63                name TEXT NOT NULL,
64                character TEXT NOT NULL,
65                created_at INTEGER NOT NULL DEFAULT (strftime('%s', 'now')),
66                updated_at INTEGER
67            )
68        "#,
69        )
70        .execute(&self.pool)
71        .await?;
72
73        // Entities table (belongs to agent)
74        sqlx::query(
75            r#"
76            CREATE TABLE IF NOT EXISTS entities (
77                id TEXT PRIMARY KEY,
78                agent_id TEXT NOT NULL REFERENCES agents(id) ON DELETE CASCADE,
79                name TEXT,
80                username TEXT,
81                email TEXT,
82                avatar_url TEXT,
83                metadata TEXT DEFAULT '{}',
84                created_at INTEGER NOT NULL DEFAULT (strftime('%s', 'now'))
85            )
86        "#,
87        )
88        .execute(&self.pool)
89        .await?;
90
91        // Worlds table (belongs to agent)
92        sqlx::query(
93            r#"
94            CREATE TABLE IF NOT EXISTS worlds (
95                id TEXT PRIMARY KEY,
96                name TEXT NOT NULL,
97                agent_id TEXT NOT NULL REFERENCES agents(id) ON DELETE CASCADE,
98                server_id TEXT,
99                metadata TEXT DEFAULT '{}',
100                created_at INTEGER NOT NULL DEFAULT (strftime('%s', 'now'))
101            )
102        "#,
103        )
104        .execute(&self.pool)
105        .await?;
106
107        // Rooms table (belongs to world)
108        sqlx::query(
109            r#"
110            CREATE TABLE IF NOT EXISTS rooms (
111                id TEXT PRIMARY KEY,
112                agent_id TEXT REFERENCES agents(id) ON DELETE SET NULL,
113                name TEXT NOT NULL,
114                source TEXT NOT NULL,
115                type TEXT NOT NULL,
116                channel_id TEXT,
117                server_id TEXT,
118                world_id TEXT NOT NULL REFERENCES worlds(id) ON DELETE CASCADE,
119                metadata TEXT DEFAULT '{}',
120                created_at INTEGER NOT NULL DEFAULT (strftime('%s', 'now'))
121            )
122        "#,
123        )
124        .execute(&self.pool)
125        .await?;
126
127        // Memories table (core data)
128        sqlx::query(
129            r#"
130            CREATE TABLE IF NOT EXISTS memories (
131                id TEXT PRIMARY KEY,
132                entity_id TEXT NOT NULL REFERENCES entities(id) ON DELETE CASCADE,
133                agent_id TEXT NOT NULL REFERENCES agents(id) ON DELETE CASCADE,
134                room_id TEXT NOT NULL REFERENCES rooms(id) ON DELETE CASCADE,
135                content TEXT NOT NULL,
136                metadata TEXT,
137                created_at INTEGER NOT NULL DEFAULT (strftime('%s', 'now')),
138                unique_flag INTEGER DEFAULT 0
139            )
140        "#,
141        )
142        .execute(&self.pool)
143        .await?;
144
145        // Participants junction table
146        sqlx::query(
147            r#"
148            CREATE TABLE IF NOT EXISTS participants (
149                entity_id TEXT NOT NULL REFERENCES entities(id) ON DELETE CASCADE,
150                room_id TEXT NOT NULL REFERENCES rooms(id) ON DELETE CASCADE,
151                joined_at INTEGER NOT NULL DEFAULT (strftime('%s', 'now')),
152                metadata TEXT DEFAULT '{}',
153                PRIMARY KEY (entity_id, room_id)
154            )
155        "#,
156        )
157        .execute(&self.pool)
158        .await?;
159
160        // Relationships table (includes type in unique constraint)
161        sqlx::query(
162            r#"
163            CREATE TABLE IF NOT EXISTS relationships (
164                id TEXT PRIMARY KEY,
165                entity_id_a TEXT NOT NULL REFERENCES entities(id) ON DELETE CASCADE,
166                entity_id_b TEXT NOT NULL REFERENCES entities(id) ON DELETE CASCADE,
167                type TEXT NOT NULL,
168                agent_id TEXT NOT NULL REFERENCES agents(id) ON DELETE CASCADE,
169                metadata TEXT DEFAULT '{}',
170                created_at INTEGER NOT NULL DEFAULT (strftime('%s', 'now')),
171                UNIQUE (entity_id_a, entity_id_b, type)
172            )
173        "#,
174        )
175        .execute(&self.pool)
176        .await?;
177
178        // Components table (ECS-style)
179        sqlx::query(
180            r#"
181            CREATE TABLE IF NOT EXISTS components (
182                id TEXT PRIMARY KEY,
183                entity_id TEXT NOT NULL REFERENCES entities(id) ON DELETE CASCADE,
184                world_id TEXT NOT NULL REFERENCES worlds(id) ON DELETE CASCADE,
185                source_entity_id TEXT REFERENCES entities(id) ON DELETE SET NULL,
186                type TEXT NOT NULL,
187                data TEXT NOT NULL,
188                created_at INTEGER NOT NULL DEFAULT (strftime('%s', 'now')),
189                updated_at INTEGER,
190                UNIQUE (entity_id, world_id, type, source_entity_id)
191            )
192        "#,
193        )
194        .execute(&self.pool)
195        .await?;
196
197        // Tasks table (scheduled work)
198        sqlx::query(
199            r#"
200            CREATE TABLE IF NOT EXISTS tasks (
201                id TEXT PRIMARY KEY,
202                agent_id TEXT NOT NULL REFERENCES agents(id) ON DELETE CASCADE,
203                task_type TEXT NOT NULL,
204                data TEXT NOT NULL,
205                status TEXT NOT NULL DEFAULT 'PENDING',
206                priority INTEGER DEFAULT 0,
207                scheduled_at INTEGER,
208                executed_at INTEGER,
209                retry_count INTEGER DEFAULT 0,
210                max_retries INTEGER DEFAULT 3,
211                error TEXT,
212                created_at INTEGER NOT NULL DEFAULT (strftime('%s', 'now')),
213                updated_at INTEGER
214            )
215        "#,
216        )
217        .execute(&self.pool)
218        .await?;
219
220        // Logs table
221        sqlx::query(
222            r#"
223            CREATE TABLE IF NOT EXISTS logs (
224                id TEXT PRIMARY KEY,
225                entity_id TEXT NOT NULL REFERENCES entities(id) ON DELETE CASCADE,
226                room_id TEXT REFERENCES rooms(id) ON DELETE SET NULL,
227                body TEXT NOT NULL,
228                type TEXT NOT NULL,
229                created_at INTEGER NOT NULL DEFAULT (strftime('%s', 'now'))
230            )
231        "#,
232        )
233        .execute(&self.pool)
234        .await?;
235
236        // ============================================================
237        // CREATE INDICES
238        // ============================================================
239
240        // -- Entities indices --
241        sqlx::query("CREATE INDEX IF NOT EXISTS idx_entities_agent_id ON entities(agent_id)")
242            .execute(&self.pool)
243            .await?;
244
245        sqlx::query("CREATE INDEX IF NOT EXISTS idx_entities_username ON entities(username)")
246            .execute(&self.pool)
247            .await?;
248
249        // -- Worlds indices --
250        sqlx::query("CREATE INDEX IF NOT EXISTS idx_worlds_agent_id ON worlds(agent_id)")
251            .execute(&self.pool)
252            .await?;
253
254        sqlx::query("CREATE INDEX IF NOT EXISTS idx_worlds_server_id ON worlds(server_id)")
255            .execute(&self.pool)
256            .await?;
257
258        // -- Rooms indices --
259        sqlx::query("CREATE INDEX IF NOT EXISTS idx_rooms_world_id ON rooms(world_id)")
260            .execute(&self.pool)
261            .await?;
262
263        sqlx::query("CREATE INDEX IF NOT EXISTS idx_rooms_agent_id ON rooms(agent_id)")
264            .execute(&self.pool)
265            .await?;
266
267        sqlx::query("CREATE INDEX IF NOT EXISTS idx_rooms_channel_id ON rooms(channel_id)")
268            .execute(&self.pool)
269            .await?;
270
271        sqlx::query(
272            "CREATE INDEX IF NOT EXISTS idx_rooms_source_server ON rooms(source, server_id)",
273        )
274        .execute(&self.pool)
275        .await?;
276
277        sqlx::query("CREATE INDEX IF NOT EXISTS idx_rooms_type ON rooms(type)")
278            .execute(&self.pool)
279            .await?;
280
281        // -- Memories indices --
282        sqlx::query("CREATE INDEX IF NOT EXISTS idx_memories_agent_id ON memories(agent_id)")
283            .execute(&self.pool)
284            .await?;
285
286        sqlx::query("CREATE INDEX IF NOT EXISTS idx_memories_room_id ON memories(room_id)")
287            .execute(&self.pool)
288            .await?;
289
290        sqlx::query("CREATE INDEX IF NOT EXISTS idx_memories_entity_id ON memories(entity_id)")
291            .execute(&self.pool)
292            .await?;
293
294        sqlx::query(
295            "CREATE INDEX IF NOT EXISTS idx_memories_created_at ON memories(created_at DESC)",
296        )
297        .execute(&self.pool)
298        .await?;
299
300        sqlx::query("CREATE INDEX IF NOT EXISTS idx_memories_agent_room_created ON memories(agent_id, room_id, created_at DESC)")
301            .execute(&self.pool)
302            .await?;
303
304        // -- Participants indices --
305        sqlx::query(
306            "CREATE INDEX IF NOT EXISTS idx_participants_entity_id ON participants(entity_id)",
307        )
308        .execute(&self.pool)
309        .await?;
310
311        sqlx::query("CREATE INDEX IF NOT EXISTS idx_participants_room_id ON participants(room_id)")
312            .execute(&self.pool)
313            .await?;
314
315        // -- Relationships indices --
316        sqlx::query(
317            "CREATE INDEX IF NOT EXISTS idx_relationships_entity_a ON relationships(entity_id_a)",
318        )
319        .execute(&self.pool)
320        .await?;
321
322        sqlx::query(
323            "CREATE INDEX IF NOT EXISTS idx_relationships_entity_b ON relationships(entity_id_b)",
324        )
325        .execute(&self.pool)
326        .await?;
327
328        sqlx::query(
329            "CREATE INDEX IF NOT EXISTS idx_relationships_agent_id ON relationships(agent_id)",
330        )
331        .execute(&self.pool)
332        .await?;
333
334        sqlx::query("CREATE INDEX IF NOT EXISTS idx_relationships_type ON relationships(type)")
335            .execute(&self.pool)
336            .await?;
337
338        // -- Components indices --
339        sqlx::query("CREATE INDEX IF NOT EXISTS idx_components_entity_id ON components(entity_id)")
340            .execute(&self.pool)
341            .await?;
342
343        sqlx::query("CREATE INDEX IF NOT EXISTS idx_components_world_id ON components(world_id)")
344            .execute(&self.pool)
345            .await?;
346
347        sqlx::query("CREATE INDEX IF NOT EXISTS idx_components_type ON components(type)")
348            .execute(&self.pool)
349            .await?;
350
351        sqlx::query(
352            "CREATE INDEX IF NOT EXISTS idx_components_entity_type ON components(entity_id, type)",
353        )
354        .execute(&self.pool)
355        .await?;
356
357        // -- Tasks indices --
358        sqlx::query("CREATE INDEX IF NOT EXISTS idx_tasks_agent_id ON tasks(agent_id)")
359            .execute(&self.pool)
360            .await?;
361
362        sqlx::query("CREATE INDEX IF NOT EXISTS idx_tasks_status ON tasks(status)")
363            .execute(&self.pool)
364            .await?;
365
366        sqlx::query(
367            "CREATE INDEX IF NOT EXISTS idx_tasks_status_scheduled ON tasks(status, scheduled_at)",
368        )
369        .execute(&self.pool)
370        .await?;
371
372        sqlx::query("CREATE INDEX IF NOT EXISTS idx_tasks_agent_status ON tasks(agent_id, status)")
373            .execute(&self.pool)
374            .await?;
375
376        // -- Logs indices --
377        sqlx::query("CREATE INDEX IF NOT EXISTS idx_logs_entity_id ON logs(entity_id)")
378            .execute(&self.pool)
379            .await?;
380
381        sqlx::query("CREATE INDEX IF NOT EXISTS idx_logs_room_id ON logs(room_id)")
382            .execute(&self.pool)
383            .await?;
384
385        sqlx::query("CREATE INDEX IF NOT EXISTS idx_logs_type ON logs(type)")
386            .execute(&self.pool)
387            .await?;
388
389        sqlx::query("CREATE INDEX IF NOT EXISTS idx_logs_created_at ON logs(created_at DESC)")
390            .execute(&self.pool)
391            .await?;
392
393        sqlx::query("CREATE INDEX IF NOT EXISTS idx_logs_entity_created ON logs(entity_id, created_at DESC)")
394            .execute(&self.pool)
395            .await?;
396
397        // ============================================================
398        // OBSERVABILITY TABLES
399        // ============================================================
400
401        sqlx::query(
402            r#"
403            CREATE TABLE IF NOT EXISTS llm_costs (
404                id TEXT PRIMARY KEY,
405                timestamp INTEGER NOT NULL,
406
407                agent_id TEXT NOT NULL,
408                user_id TEXT,
409                conversation_id TEXT,
410                action_name TEXT,
411                evaluator_name TEXT,
412
413                provider TEXT NOT NULL,
414                model TEXT NOT NULL,
415                temperature REAL NOT NULL,
416
417                prompt_tokens INTEGER NOT NULL,
418                completion_tokens INTEGER NOT NULL,
419                total_tokens INTEGER NOT NULL,
420                cached_tokens INTEGER,
421
422                input_cost_usd REAL NOT NULL,
423                output_cost_usd REAL NOT NULL,
424                total_cost_usd REAL NOT NULL,
425
426                latency_ms INTEGER NOT NULL,
427                ttft_ms INTEGER,
428
429                success INTEGER NOT NULL,
430                error TEXT,
431
432                prompt_hash TEXT,
433                prompt_preview TEXT
434            )
435        "#,
436        )
437        .execute(&self.pool)
438        .await?;
439
440        sqlx::query(
441            r#"
442            CREATE TABLE IF NOT EXISTS stored_prompts (
443                id TEXT PRIMARY KEY,
444                timestamp INTEGER NOT NULL,
445                cost_record_id TEXT REFERENCES llm_costs(id) ON DELETE CASCADE,
446                agent_id TEXT NOT NULL,
447                conversation_id TEXT,
448                prompt_hash TEXT NOT NULL,
449                prompt_text TEXT,
450                prompt_length INTEGER NOT NULL,
451                sanitized INTEGER NOT NULL,
452                sanitization_level TEXT NOT NULL,
453                completion_text TEXT,
454                completion_length INTEGER NOT NULL DEFAULT 0,
455                model TEXT NOT NULL,
456                temperature REAL NOT NULL
457            )
458        "#,
459        )
460        .execute(&self.pool)
461        .await?;
462
463        // Observability indices
464        sqlx::query("CREATE INDEX IF NOT EXISTS idx_llm_costs_agent_timestamp ON llm_costs(agent_id, timestamp DESC)")
465            .execute(&self.pool)
466            .await?;
467
468        sqlx::query(
469            "CREATE INDEX IF NOT EXISTS idx_llm_costs_provider_model ON llm_costs(provider, model)",
470        )
471        .execute(&self.pool)
472        .await?;
473
474        sqlx::query(
475            "CREATE INDEX IF NOT EXISTS idx_llm_costs_timestamp ON llm_costs(timestamp DESC)",
476        )
477        .execute(&self.pool)
478        .await?;
479
480        sqlx::query(
481            "CREATE INDEX IF NOT EXISTS idx_llm_costs_conversation ON llm_costs(conversation_id)",
482        )
483        .execute(&self.pool)
484        .await?;
485
486        sqlx::query("CREATE INDEX IF NOT EXISTS idx_stored_prompts_agent_timestamp ON stored_prompts(agent_id, timestamp DESC)")
487            .execute(&self.pool)
488            .await?;
489
490        sqlx::query(
491            "CREATE INDEX IF NOT EXISTS idx_stored_prompts_hash ON stored_prompts(prompt_hash)",
492        )
493        .execute(&self.pool)
494        .await?;
495
496        sqlx::query("CREATE INDEX IF NOT EXISTS idx_stored_prompts_cost_record ON stored_prompts(cost_record_id)")
497            .execute(&self.pool)
498            .await?;
499
500        info!("SQLite schema initialized successfully");
501        Ok(())
502    }
503}
504
505#[async_trait]
506impl IDatabaseAdapter for SqliteAdapter {
507    fn db(&self) -> &dyn std::any::Any {
508        &self.pool
509    }
510
511    async fn initialize(&mut self, _config: Option<serde_json::Value>) -> Result<()> {
512        self.init_schema().await
513    }
514
515    async fn is_ready(&self) -> Result<bool> {
516        match sqlx::query("SELECT 1").fetch_one(&self.pool).await {
517            Ok(_) => Ok(true),
518            Err(_) => Ok(false),
519        }
520    }
521
522    async fn close(&mut self) -> Result<()> {
523        self.pool.close().await;
524        Ok(())
525    }
526
527    async fn get_connection(&self) -> Result<Box<dyn std::any::Any + Send>> {
528        Ok(Box::new(self.pool.clone()))
529    }
530
531    async fn run_plugin_migrations(
532        &self,
533        plugins: Vec<PluginMigration>,
534        options: MigrationOptions,
535    ) -> Result<()> {
536        if plugins.is_empty() {
537            return Ok(());
538        }
539
540        // SQLite-safe identifier validation
541        fn validate_identifier(name: &str) -> Result<&str> {
542            if name.len() > 64 {
543                return Err(ZoeyError::validation(format!(
544                    "Identifier too long: {} (max 64 characters)",
545                    name.len()
546                )));
547            }
548            if !name.chars().all(|c| c.is_ascii_alphanumeric() || c == '_') {
549                return Err(ZoeyError::validation(format!(
550                    "Invalid identifier '{}': only alphanumeric and underscore allowed",
551                    name
552                )));
553            }
554            Ok(name)
555        }
556
557        for plugin in plugins {
558            if let Some(schema) = plugin.schema {
559                if options.verbose {
560                    tracing::info!("Applying schema for plugin '{}' (SQLite)", plugin.name);
561                }
562
563                let schema_obj = schema.as_object().ok_or_else(|| {
564                    ZoeyError::validation(format!(
565                        "Invalid schema for plugin '{}': expected JSON object",
566                        plugin.name
567                    ))
568                })?;
569
570                for (table_name, table_def) in schema_obj.iter() {
571                    let table_name = validate_identifier(table_name)?;
572
573                    let columns_obj = table_def
574                        .get("columns")
575                        .and_then(|v| v.as_object())
576                        .ok_or_else(|| {
577                            ZoeyError::validation(format!(
578                                "Invalid table definition for '{}': missing 'columns'",
579                                table_name
580                            ))
581                        })?;
582
583                    let mut cols_sql: Vec<String> = Vec::new();
584                    for (col_name, col_type_val) in columns_obj.iter() {
585                        let col_name = validate_identifier(col_name)?;
586                        let mut col_type = col_type_val
587                            .as_str()
588                            .ok_or_else(|| {
589                                ZoeyError::validation(format!(
590                                    "Invalid type for column '{}.{}'",
591                                    table_name, col_name
592                                ))
593                            })?
594                            .to_string();
595
596                        // Map common Postgres types to SQLite equivalents
597                        col_type = col_type
598                            .replace("UUID", "TEXT")
599                            .replace("TIMESTAMP", "INTEGER")
600                            .replace("JSONB", "TEXT")
601                            .replace("BOOLEAN", "INTEGER")
602                            .replace("TEXT", "TEXT");
603
604                        cols_sql.push(format!("{} {}", col_name, col_type));
605                    }
606
607                    let create_sql = format!(
608                        "CREATE TABLE IF NOT EXISTS {} ({})",
609                        table_name,
610                        cols_sql.join(", ")
611                    );
612
613                    if options.verbose {
614                        tracing::debug!("{}", create_sql);
615                    }
616                    if !options.dry_run {
617                        sqlx::query(&create_sql).execute(&self.pool).await?;
618                    }
619                }
620
621                if options.verbose {
622                    tracing::info!("✓ Schema applied for plugin '{}' (SQLite)", plugin.name);
623                }
624            }
625        }
626        Ok(())
627    }
628
629    async fn persist_llm_cost(&self, record: LLMCostRecord) -> zoey_core::Result<()> {
630        sqlx::query(
631            r#"
632            INSERT INTO llm_costs (
633                id, timestamp, agent_id, user_id, conversation_id, action_name, evaluator_name,
634                provider, model, temperature, prompt_tokens, completion_tokens, total_tokens, cached_tokens,
635                input_cost_usd, output_cost_usd, total_cost_usd, latency_ms, ttft_ms,
636                success, error, prompt_hash, prompt_preview
637            ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
638            "#
639        )
640        .bind(record.id.to_string())
641        .bind(record.timestamp.timestamp())
642        .bind(record.agent_id.to_string())
643        .bind(record.user_id)
644        .bind(record.conversation_id.map(|v| v.to_string()))
645        .bind(record.action_name)
646        .bind(record.evaluator_name)
647        .bind(record.provider)
648        .bind(record.model)
649        .bind(record.temperature)
650        .bind(record.prompt_tokens as i64)
651        .bind(record.completion_tokens as i64)
652        .bind(record.total_tokens as i64)
653        .bind(record.cached_tokens.map(|v| v as i64))
654        .bind(record.input_cost_usd)
655        .bind(record.output_cost_usd)
656        .bind(record.total_cost_usd)
657        .bind(record.latency_ms as i64)
658        .bind(record.ttft_ms.map(|v| v as i64))
659        .bind(if record.success { 1 } else { 0 })
660        .bind(record.error)
661        .bind(record.prompt_hash)
662        .bind(record.prompt_preview)
663        .execute(&self.pool)
664        .await?;
665
666        Ok(())
667    }
668
669    // Agent Operations - Fully Implemented
670
671    async fn get_agent(&self, agent_id: UUID) -> Result<Option<Agent>> {
672        let row = sqlx::query(
673            "SELECT id, name, character, created_at, updated_at FROM agents WHERE id = ?",
674        )
675        .bind(agent_id.to_string())
676        .fetch_optional(&self.pool)
677        .await?;
678
679        match row {
680            Some(row) => {
681                let id_str: String = row.get("id");
682                let character_str: String = row.get("character");
683
684                Ok(Some(Agent {
685                    id: uuid::Uuid::parse_str(&id_str)
686                        .map_err(|e| ZoeyError::database(format!("Invalid UUID: {}", e)))?,
687                    name: row.get("name"),
688                    character: serde_json::from_str(&character_str)?,
689                    created_at: row.get("created_at"),
690                    updated_at: row.get("updated_at"),
691                }))
692            }
693            None => Ok(None),
694        }
695    }
696
697    async fn get_agents(&self) -> Result<Vec<Agent>> {
698        let rows = sqlx::query("SELECT id, name, character, created_at, updated_at FROM agents")
699            .fetch_all(&self.pool)
700            .await?;
701
702        let mut agents = Vec::new();
703        for row in rows {
704            let id_str: String = row.get("id");
705            let character_str: String = row.get("character");
706
707            agents.push(Agent {
708                id: uuid::Uuid::parse_str(&id_str)
709                    .map_err(|e| ZoeyError::database(format!("Invalid UUID: {}", e)))?,
710                name: row.get("name"),
711                character: serde_json::from_str(&character_str)?,
712                created_at: row.get("created_at"),
713                updated_at: row.get("updated_at"),
714            });
715        }
716
717        Ok(agents)
718    }
719
720    async fn create_agent(&self, agent: &Agent) -> Result<bool> {
721        let result = sqlx::query(
722            "INSERT INTO agents (id, name, character, created_at, updated_at) VALUES (?, ?, ?, ?, ?)"
723        )
724        .bind(agent.id.to_string())
725        .bind(&agent.name)
726        .bind(serde_json::to_string(&agent.character)?)
727        .bind(agent.created_at)
728        .bind(agent.updated_at)
729        .execute(&self.pool)
730        .await?;
731
732        Ok(result.rows_affected() > 0)
733    }
734
735    async fn update_agent(&self, agent_id: UUID, agent: &Agent) -> Result<bool> {
736        let result =
737            sqlx::query("UPDATE agents SET name = ?, character = ?, updated_at = ? WHERE id = ?")
738                .bind(&agent.name)
739                .bind(serde_json::to_string(&agent.character)?)
740                .bind(chrono::Utc::now().timestamp())
741                .bind(agent_id.to_string())
742                .execute(&self.pool)
743                .await?;
744
745        Ok(result.rows_affected() > 0)
746    }
747
748    async fn delete_agent(&self, agent_id: UUID) -> Result<bool> {
749        let result = sqlx::query("DELETE FROM agents WHERE id = ?")
750            .bind(agent_id.to_string())
751            .execute(&self.pool)
752            .await?;
753
754        Ok(result.rows_affected() > 0)
755    }
756
757    async fn ensure_embedding_dimension(&self, _dimension: usize) -> Result<()> {
758        // SQLite doesn't have native vector support
759        // Would use a library like sqlite-vss or fall back to BM25
760        Ok(())
761    }
762
763    async fn get_entities_by_ids(&self, entity_ids: Vec<UUID>) -> Result<Vec<Entity>> {
764        if entity_ids.is_empty() {
765            return Ok(vec![]);
766        }
767
768        let placeholders = entity_ids
769            .iter()
770            .map(|_| "?")
771            .collect::<Vec<_>>()
772            .join(", ");
773        let query = format!("SELECT id, agent_id, name, username, email, avatar_url, metadata, created_at FROM entities WHERE id IN ({})", placeholders);
774
775        let mut query_builder = sqlx::query(&query);
776        for id in &entity_ids {
777            query_builder = query_builder.bind(id.to_string());
778        }
779
780        let rows = query_builder.fetch_all(&self.pool).await?;
781
782        let mut entities = Vec::new();
783        for row in rows {
784            let id_str: String = row.get("id");
785            let agent_id_str: String = row.get("agent_id");
786            let metadata_str: String = row.get("metadata");
787
788            entities.push(Entity {
789                id: uuid::Uuid::parse_str(&id_str)
790                    .map_err(|e| ZoeyError::database(format!("Invalid UUID: {}", e)))?,
791                agent_id: uuid::Uuid::parse_str(&agent_id_str)
792                    .map_err(|e| ZoeyError::database(format!("Invalid UUID: {}", e)))?,
793                name: row.get("name"),
794                username: row.get("username"),
795                email: row.get("email"),
796                avatar_url: row.get("avatar_url"),
797                metadata: serde_json::from_str(&metadata_str).unwrap_or_default(),
798                created_at: row.get("created_at"),
799            });
800        }
801
802        Ok(entities)
803    }
804
805    async fn get_entities_for_room(
806        &self,
807        room_id: UUID,
808        _include_components: bool,
809    ) -> Result<Vec<Entity>> {
810        let rows = sqlx::query(
811            r#"
812            SELECT e.id, e.agent_id, e.name, e.username, e.email, e.avatar_url, e.metadata, e.created_at
813            FROM entities e
814            INNER JOIN participants p ON e.id = p.entity_id
815            WHERE p.room_id = ?
816            "#,
817        )
818        .bind(room_id.to_string())
819        .fetch_all(&self.pool)
820        .await?;
821
822        let mut entities = Vec::new();
823        for row in rows {
824            let id_str: String = row.get("id");
825            let agent_id_str: String = row.get("agent_id");
826            let metadata_str: String = row.get("metadata");
827
828            entities.push(Entity {
829                id: uuid::Uuid::parse_str(&id_str)
830                    .map_err(|e| ZoeyError::database(format!("Invalid UUID: {}", e)))?,
831                agent_id: uuid::Uuid::parse_str(&agent_id_str)
832                    .map_err(|e| ZoeyError::database(format!("Invalid UUID: {}", e)))?,
833                name: row.get("name"),
834                username: row.get("username"),
835                email: row.get("email"),
836                avatar_url: row.get("avatar_url"),
837                metadata: serde_json::from_str(&metadata_str).unwrap_or_default(),
838                created_at: row.get("created_at"),
839            });
840        }
841
842        Ok(entities)
843    }
844
845    async fn create_entities(&self, entities: Vec<Entity>) -> Result<bool> {
846        for entity in entities {
847            sqlx::query(
848                "INSERT OR REPLACE INTO entities (id, agent_id, name, username, email, avatar_url, metadata, created_at)
849                 VALUES (?, ?, ?, ?, ?, ?, ?, ?)"
850            )
851            .bind(entity.id.to_string())
852            .bind(entity.agent_id.to_string())
853            .bind(&entity.name)
854            .bind(&entity.username)
855            .bind(&entity.email)
856            .bind(&entity.avatar_url)
857            .bind(serde_json::to_string(&entity.metadata)?)
858            .bind(entity.created_at)
859            .execute(&self.pool)
860            .await?;
861        }
862        Ok(true)
863    }
864
865    async fn update_entity(&self, entity: &Entity) -> Result<()> {
866        sqlx::query(
867            "UPDATE entities SET name = ?, username = ?, email = ?, avatar_url = ?, metadata = ? WHERE id = ?"
868        )
869        .bind(&entity.name)
870        .bind(&entity.username)
871        .bind(&entity.email)
872        .bind(&entity.avatar_url)
873        .bind(serde_json::to_string(&entity.metadata)?)
874        .bind(entity.id.to_string())
875        .execute(&self.pool)
876        .await?;
877
878        Ok(())
879    }
880
881    async fn get_entity_by_id(&self, entity_id: UUID) -> Result<Option<Entity>> {
882        let row = sqlx::query(
883            "SELECT id, agent_id, name, username, email, avatar_url, metadata, created_at FROM entities WHERE id = ?"
884        )
885        .bind(entity_id.to_string())
886        .fetch_optional(&self.pool)
887        .await?;
888
889        match row {
890            Some(row) => {
891                let id_str: String = row.get("id");
892                let agent_id_str: String = row.get("agent_id");
893                let metadata_str: String = row.get("metadata");
894
895                Ok(Some(Entity {
896                    id: uuid::Uuid::parse_str(&id_str)
897                        .map_err(|e| ZoeyError::database(format!("Invalid UUID: {}", e)))?,
898                    agent_id: uuid::Uuid::parse_str(&agent_id_str)
899                        .map_err(|e| ZoeyError::database(format!("Invalid UUID: {}", e)))?,
900                    name: row.get("name"),
901                    username: row.get("username"),
902                    email: row.get("email"),
903                    avatar_url: row.get("avatar_url"),
904                    metadata: serde_json::from_str(&metadata_str).unwrap_or_default(),
905                    created_at: row.get("created_at"),
906                }))
907            }
908            None => Ok(None),
909        }
910    }
911
912    async fn get_component(
913        &self,
914        entity_id: UUID,
915        component_type: &str,
916        world_id: Option<UUID>,
917        source_entity_id: Option<UUID>,
918    ) -> Result<Option<Component>> {
919        let mut query = String::from(
920            "SELECT id, entity_id, world_id, source_entity_id, type, data, created_at, updated_at FROM components WHERE entity_id = ? AND type = ?",
921        );
922        let mut bindings: Vec<String> = vec![entity_id.to_string(), component_type.to_string()];
923
924        if let Some(wid) = world_id {
925            query.push_str(" AND world_id = ?");
926            bindings.push(wid.to_string());
927        }
928        if let Some(sid) = source_entity_id {
929            query.push_str(" AND source_entity_id = ?");
930            bindings.push(sid.to_string());
931        } else {
932            query.push_str(" AND source_entity_id IS NULL");
933        }
934
935        let mut query_builder = sqlx::query(&query);
936        for binding in &bindings {
937            query_builder = query_builder.bind(binding);
938        }
939
940        let row = query_builder.fetch_optional(&self.pool).await?;
941
942        match row {
943            Some(row) => {
944                let id_str: String = row.get("id");
945                let entity_id_str: String = row.get("entity_id");
946                let world_id_str: String = row.get("world_id");
947                let source_entity_id_str: Option<String> = row.get("source_entity_id");
948                let data_str: String = row.get("data");
949
950                Ok(Some(Component {
951                    id: uuid::Uuid::parse_str(&id_str)
952                        .map_err(|e| ZoeyError::database(format!("Invalid UUID: {}", e)))?,
953                    entity_id: uuid::Uuid::parse_str(&entity_id_str)
954                        .map_err(|e| ZoeyError::database(format!("Invalid UUID: {}", e)))?,
955                    world_id: uuid::Uuid::parse_str(&world_id_str)
956                        .map_err(|e| ZoeyError::database(format!("Invalid UUID: {}", e)))?,
957                    source_entity_id: source_entity_id_str
958                        .map(|s| uuid::Uuid::parse_str(&s).ok())
959                        .flatten(),
960                    component_type: row.get("type"),
961                    data: serde_json::from_str(&data_str)?,
962                    created_at: row.get("created_at"),
963                    updated_at: row.get("updated_at"),
964                }))
965            }
966            None => Ok(None),
967        }
968    }
969
970    async fn get_components(
971        &self,
972        entity_id: UUID,
973        world_id: Option<UUID>,
974        source_entity_id: Option<UUID>,
975    ) -> Result<Vec<Component>> {
976        let mut query = String::from(
977            "SELECT id, entity_id, world_id, source_entity_id, type, data, created_at, updated_at FROM components WHERE entity_id = ?",
978        );
979        let mut bindings: Vec<String> = vec![entity_id.to_string()];
980
981        if let Some(wid) = world_id {
982            query.push_str(" AND world_id = ?");
983            bindings.push(wid.to_string());
984        }
985        if let Some(sid) = source_entity_id {
986            query.push_str(" AND source_entity_id = ?");
987            bindings.push(sid.to_string());
988        }
989
990        let mut query_builder = sqlx::query(&query);
991        for binding in &bindings {
992            query_builder = query_builder.bind(binding);
993        }
994
995        let rows = query_builder.fetch_all(&self.pool).await?;
996
997        let mut components = Vec::new();
998        for row in rows {
999            let id_str: String = row.get("id");
1000            let entity_id_str: String = row.get("entity_id");
1001            let world_id_str: String = row.get("world_id");
1002            let source_entity_id_str: Option<String> = row.get("source_entity_id");
1003            let data_str: String = row.get("data");
1004
1005            components.push(Component {
1006                id: uuid::Uuid::parse_str(&id_str)
1007                    .map_err(|e| ZoeyError::database(format!("Invalid UUID: {}", e)))?,
1008                entity_id: uuid::Uuid::parse_str(&entity_id_str)
1009                    .map_err(|e| ZoeyError::database(format!("Invalid UUID: {}", e)))?,
1010                world_id: uuid::Uuid::parse_str(&world_id_str)
1011                    .map_err(|e| ZoeyError::database(format!("Invalid UUID: {}", e)))?,
1012                source_entity_id: source_entity_id_str
1013                    .map(|s| uuid::Uuid::parse_str(&s).ok())
1014                    .flatten(),
1015                component_type: row.get("type"),
1016                data: serde_json::from_str(&data_str)?,
1017                created_at: row.get("created_at"),
1018                updated_at: row.get("updated_at"),
1019            });
1020        }
1021
1022        Ok(components)
1023    }
1024
1025    async fn create_component(&self, component: &Component) -> Result<bool> {
1026        let result = sqlx::query(
1027            r#"
1028            INSERT INTO components (id, entity_id, world_id, source_entity_id, type, data, created_at, updated_at)
1029            VALUES (?, ?, ?, ?, ?, ?, ?, ?)
1030            ON CONFLICT (entity_id, world_id, type, source_entity_id) DO UPDATE SET
1031                data = excluded.data,
1032                updated_at = excluded.updated_at
1033            "#,
1034        )
1035        .bind(component.id.to_string())
1036        .bind(component.entity_id.to_string())
1037        .bind(component.world_id.to_string())
1038        .bind(component.source_entity_id.map(|id| id.to_string()))
1039        .bind(&component.component_type)
1040        .bind(serde_json::to_string(&component.data)?)
1041        .bind(component.created_at)
1042        .bind(component.updated_at)
1043        .execute(&self.pool)
1044        .await?;
1045
1046        Ok(result.rows_affected() > 0)
1047    }
1048
1049    async fn update_component(&self, component: &Component) -> Result<()> {
1050        sqlx::query("UPDATE components SET data = ?, updated_at = ? WHERE id = ?")
1051            .bind(serde_json::to_string(&component.data)?)
1052            .bind(chrono::Utc::now().timestamp())
1053            .bind(component.id.to_string())
1054            .execute(&self.pool)
1055            .await?;
1056
1057        Ok(())
1058    }
1059
1060    async fn delete_component(&self, component_id: UUID) -> Result<()> {
1061        sqlx::query("DELETE FROM components WHERE id = ?")
1062            .bind(component_id.to_string())
1063            .execute(&self.pool)
1064            .await?;
1065
1066        Ok(())
1067    }
1068
1069    async fn get_memories(&self, params: MemoryQuery) -> Result<Vec<Memory>> {
1070        let mut query = String::from("SELECT id, entity_id, agent_id, room_id, content, metadata, created_at, unique_flag FROM memories WHERE 1=1");
1071
1072        let mut bindings: Vec<String> = Vec::new();
1073
1074        if let Some(agent_id) = params.agent_id {
1075            query.push_str(" AND agent_id = ?");
1076            bindings.push(agent_id.to_string());
1077        }
1078        if let Some(room_id) = params.room_id {
1079            query.push_str(" AND room_id = ?");
1080            bindings.push(room_id.to_string());
1081        }
1082        if let Some(entity_id) = params.entity_id {
1083            query.push_str(" AND entity_id = ?");
1084            bindings.push(entity_id.to_string());
1085        }
1086        if let Some(unique) = params.unique {
1087            query.push_str(" AND unique_flag = ?");
1088            bindings.push(if unique {
1089                "1".to_string()
1090            } else {
1091                "0".to_string()
1092            });
1093        }
1094
1095        query.push_str(" ORDER BY created_at DESC");
1096
1097        if let Some(count) = params.count {
1098            query.push_str(&format!(" LIMIT {}", count));
1099        }
1100
1101        let mut query_builder = sqlx::query(&query);
1102        for binding in &bindings {
1103            query_builder = query_builder.bind(binding);
1104        }
1105
1106        let rows = query_builder.fetch_all(&self.pool).await?;
1107
1108        let mut memories = Vec::new();
1109        for row in rows {
1110            let id_str: String = row.get("id");
1111            let entity_id_str: String = row.get("entity_id");
1112            let agent_id_str: String = row.get("agent_id");
1113            let room_id_str: String = row.get("room_id");
1114            let content_str: String = row.get("content");
1115            let metadata_str: Option<String> = row.get("metadata");
1116            let unique_flag: i32 = row.get("unique_flag");
1117
1118            memories.push(Memory {
1119                id: uuid::Uuid::parse_str(&id_str)
1120                    .map_err(|e| ZoeyError::database(format!("Invalid UUID: {}", e)))?,
1121                entity_id: uuid::Uuid::parse_str(&entity_id_str)
1122                    .map_err(|e| ZoeyError::database(format!("Invalid UUID: {}", e)))?,
1123                agent_id: uuid::Uuid::parse_str(&agent_id_str)
1124                    .map_err(|e| ZoeyError::database(format!("Invalid UUID: {}", e)))?,
1125                room_id: uuid::Uuid::parse_str(&room_id_str)
1126                    .map_err(|e| ZoeyError::database(format!("Invalid UUID: {}", e)))?,
1127                content: serde_json::from_str(&content_str)?,
1128                embedding: None, // SQLite doesn't store embeddings natively
1129                metadata: metadata_str.and_then(|s| serde_json::from_str(&s).ok()),
1130                created_at: row.get("created_at"),
1131                unique: Some(unique_flag != 0),
1132                similarity: None,
1133            });
1134        }
1135
1136        Ok(memories)
1137    }
1138
1139    async fn create_memory(&self, memory: &Memory, _table_name: &str) -> Result<UUID> {
1140        sqlx::query(
1141            "INSERT INTO memories (id, entity_id, agent_id, room_id, content, metadata, created_at, unique_flag)
1142             VALUES (?, ?, ?, ?, ?, ?, ?, ?)"
1143        )
1144        .bind(memory.id.to_string())
1145        .bind(memory.entity_id.to_string())
1146        .bind(memory.agent_id.to_string())
1147        .bind(memory.room_id.to_string())
1148        .bind(serde_json::to_string(&memory.content)?)
1149        .bind(memory.metadata.as_ref().map(|m| serde_json::to_string(m).ok()).flatten())
1150        .bind(memory.created_at)
1151        .bind(if memory.unique.unwrap_or(false) { 1 } else { 0 })
1152        .execute(&self.pool)
1153        .await?;
1154
1155        Ok(memory.id)
1156    }
1157
1158    async fn search_memories_by_embedding(
1159        &self,
1160        _params: SearchMemoriesParams,
1161    ) -> Result<Vec<Memory>> {
1162        // SQLite doesn't have native vector search - would need sqlite-vss extension
1163        not_implemented!(
1164            "search_memories_by_embedding - vector search requires sqlite-vss extension"
1165        )
1166    }
1167
1168    async fn get_cached_embeddings(&self, _params: MemoryQuery) -> Result<Vec<Memory>> {
1169        // No embedding support in basic SQLite
1170        Ok(vec![])
1171    }
1172
1173    async fn update_memory(&self, memory: &Memory) -> Result<bool> {
1174        let result = sqlx::query("UPDATE memories SET content = ?, metadata = ? WHERE id = ?")
1175            .bind(serde_json::to_string(&memory.content)?)
1176            .bind(
1177                memory
1178                    .metadata
1179                    .as_ref()
1180                    .map(|m| serde_json::to_string(m).ok())
1181                    .flatten(),
1182            )
1183            .bind(memory.id.to_string())
1184            .execute(&self.pool)
1185            .await?;
1186
1187        Ok(result.rows_affected() > 0)
1188    }
1189
1190    async fn remove_memory(&self, memory_id: UUID, _table_name: &str) -> Result<bool> {
1191        let result = sqlx::query("DELETE FROM memories WHERE id = ?")
1192            .bind(memory_id.to_string())
1193            .execute(&self.pool)
1194            .await?;
1195
1196        Ok(result.rows_affected() > 0)
1197    }
1198
1199    async fn remove_all_memories(&self, agent_id: UUID, _table_name: &str) -> Result<bool> {
1200        let result = sqlx::query("DELETE FROM memories WHERE agent_id = ?")
1201            .bind(agent_id.to_string())
1202            .execute(&self.pool)
1203            .await?;
1204
1205        Ok(result.rows_affected() > 0)
1206    }
1207
1208    async fn count_memories(&self, params: MemoryQuery) -> Result<usize> {
1209        let mut query = String::from("SELECT COUNT(*) as count FROM memories WHERE 1=1");
1210
1211        let mut bindings: Vec<String> = Vec::new();
1212
1213        if let Some(agent_id) = params.agent_id {
1214            query.push_str(" AND agent_id = ?");
1215            bindings.push(agent_id.to_string());
1216        }
1217        if let Some(room_id) = params.room_id {
1218            query.push_str(" AND room_id = ?");
1219            bindings.push(room_id.to_string());
1220        }
1221        if let Some(entity_id) = params.entity_id {
1222            query.push_str(" AND entity_id = ?");
1223            bindings.push(entity_id.to_string());
1224        }
1225        if let Some(unique) = params.unique {
1226            query.push_str(" AND unique_flag = ?");
1227            bindings.push(if unique {
1228                "1".to_string()
1229            } else {
1230                "0".to_string()
1231            });
1232        }
1233
1234        let mut query_builder = sqlx::query(&query);
1235        for binding in &bindings {
1236            query_builder = query_builder.bind(binding);
1237        }
1238
1239        let row = query_builder.fetch_one(&self.pool).await?;
1240        let count: i64 = row.get("count");
1241
1242        Ok(count as usize)
1243    }
1244
1245    async fn get_world(&self, world_id: UUID) -> Result<Option<World>> {
1246        let row = sqlx::query(
1247            "SELECT id, name, agent_id, server_id, metadata, created_at FROM worlds WHERE id = ?",
1248        )
1249        .bind(world_id.to_string())
1250        .fetch_optional(&self.pool)
1251        .await?;
1252
1253        match row {
1254            Some(row) => {
1255                let id_str: String = row.get("id");
1256                let agent_id_str: String = row.get("agent_id");
1257                let metadata_str: String = row.get("metadata");
1258
1259                Ok(Some(World {
1260                    id: uuid::Uuid::parse_str(&id_str)
1261                        .map_err(|e| ZoeyError::database(format!("Invalid UUID: {}", e)))?,
1262                    name: row.get("name"),
1263                    agent_id: uuid::Uuid::parse_str(&agent_id_str)
1264                        .map_err(|e| ZoeyError::database(format!("Invalid UUID: {}", e)))?,
1265                    server_id: row.get("server_id"),
1266                    metadata: serde_json::from_str(&metadata_str).unwrap_or_default(),
1267                    created_at: row.get("created_at"),
1268                }))
1269            }
1270            None => Ok(None),
1271        }
1272    }
1273
1274    async fn ensure_world(&self, world: &World) -> Result<()> {
1275        sqlx::query(
1276            r#"
1277            INSERT INTO worlds (id, name, agent_id, server_id, metadata, created_at)
1278            VALUES (?, ?, ?, ?, ?, ?)
1279            ON CONFLICT (id) DO UPDATE SET
1280                name = excluded.name,
1281                server_id = excluded.server_id,
1282                metadata = excluded.metadata
1283            "#,
1284        )
1285        .bind(world.id.to_string())
1286        .bind(&world.name)
1287        .bind(world.agent_id.to_string())
1288        .bind(&world.server_id)
1289        .bind(serde_json::to_string(&world.metadata)?)
1290        .bind(
1291            world
1292                .created_at
1293                .unwrap_or_else(|| chrono::Utc::now().timestamp()),
1294        )
1295        .execute(&self.pool)
1296        .await?;
1297
1298        Ok(())
1299    }
1300
1301    async fn get_room(&self, room_id: UUID) -> Result<Option<Room>> {
1302        let row = sqlx::query(
1303            "SELECT id, agent_id, name, source, type, channel_id, server_id, world_id, metadata, created_at FROM rooms WHERE id = ?",
1304        )
1305        .bind(room_id.to_string())
1306        .fetch_optional(&self.pool)
1307        .await?;
1308
1309        match row {
1310            Some(row) => {
1311                let id_str: String = row.get("id");
1312                let agent_id_str: Option<String> = row.get("agent_id");
1313                let world_id_str: String = row.get("world_id");
1314                let metadata_str: String = row.get("metadata");
1315                let type_str: String = row.get("type");
1316
1317                Ok(Some(Room {
1318                    id: uuid::Uuid::parse_str(&id_str)
1319                        .map_err(|e| ZoeyError::database(format!("Invalid UUID: {}", e)))?,
1320                    agent_id: agent_id_str
1321                        .map(|s| uuid::Uuid::parse_str(&s).ok())
1322                        .flatten(),
1323                    name: row.get("name"),
1324                    source: row.get("source"),
1325                    channel_type: serde_json::from_str(&format!("\"{}\"", type_str))
1326                        .unwrap_or(ChannelType::Unknown),
1327                    channel_id: row.get("channel_id"),
1328                    server_id: row.get("server_id"),
1329                    world_id: uuid::Uuid::parse_str(&world_id_str)
1330                        .map_err(|e| ZoeyError::database(format!("Invalid UUID: {}", e)))?,
1331                    metadata: serde_json::from_str(&metadata_str).unwrap_or_default(),
1332                    created_at: row.get("created_at"),
1333                }))
1334            }
1335            None => Ok(None),
1336        }
1337    }
1338
1339    async fn create_room(&self, room: &Room) -> Result<UUID> {
1340        let channel_type_str = serde_json::to_string(&room.channel_type)?
1341            .trim_matches('"')
1342            .to_string();
1343
1344        sqlx::query(
1345            r#"
1346            INSERT INTO rooms (id, agent_id, name, source, type, channel_id, server_id, world_id, metadata, created_at)
1347            VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
1348            "#,
1349        )
1350        .bind(room.id.to_string())
1351        .bind(room.agent_id.map(|id| id.to_string()))
1352        .bind(&room.name)
1353        .bind(&room.source)
1354        .bind(&channel_type_str)
1355        .bind(&room.channel_id)
1356        .bind(&room.server_id)
1357        .bind(room.world_id.to_string())
1358        .bind(serde_json::to_string(&room.metadata)?)
1359        .bind(room.created_at.unwrap_or_else(|| chrono::Utc::now().timestamp()))
1360        .execute(&self.pool)
1361        .await?;
1362
1363        Ok(room.id)
1364    }
1365
1366    async fn get_rooms(&self, world_id: UUID) -> Result<Vec<Room>> {
1367        let rows = sqlx::query(
1368            "SELECT id, agent_id, name, source, type, channel_id, server_id, world_id, metadata, created_at FROM rooms WHERE world_id = ?",
1369        )
1370        .bind(world_id.to_string())
1371        .fetch_all(&self.pool)
1372        .await?;
1373
1374        let mut rooms = Vec::new();
1375        for row in rows {
1376            let id_str: String = row.get("id");
1377            let agent_id_str: Option<String> = row.get("agent_id");
1378            let world_id_str: String = row.get("world_id");
1379            let metadata_str: String = row.get("metadata");
1380            let type_str: String = row.get("type");
1381
1382            rooms.push(Room {
1383                id: uuid::Uuid::parse_str(&id_str)
1384                    .map_err(|e| ZoeyError::database(format!("Invalid UUID: {}", e)))?,
1385                agent_id: agent_id_str
1386                    .map(|s| uuid::Uuid::parse_str(&s).ok())
1387                    .flatten(),
1388                name: row.get("name"),
1389                source: row.get("source"),
1390                channel_type: serde_json::from_str(&format!("\"{}\"", type_str))
1391                    .unwrap_or(ChannelType::Unknown),
1392                channel_id: row.get("channel_id"),
1393                server_id: row.get("server_id"),
1394                world_id: uuid::Uuid::parse_str(&world_id_str)
1395                    .map_err(|e| ZoeyError::database(format!("Invalid UUID: {}", e)))?,
1396                metadata: serde_json::from_str(&metadata_str).unwrap_or_default(),
1397                created_at: row.get("created_at"),
1398            });
1399        }
1400
1401        Ok(rooms)
1402    }
1403
1404    async fn get_rooms_for_agent(&self, agent_id: UUID) -> Result<Vec<Room>> {
1405        let rows = sqlx::query(
1406            "SELECT id, agent_id, name, source, type, channel_id, server_id, world_id, metadata, created_at FROM rooms WHERE agent_id = ?",
1407        )
1408        .bind(agent_id.to_string())
1409        .fetch_all(&self.pool)
1410        .await?;
1411
1412        let mut rooms = Vec::new();
1413        for row in rows {
1414            let id_str: String = row.get("id");
1415            let agent_id_str: Option<String> = row.get("agent_id");
1416            let world_id_str: String = row.get("world_id");
1417            let metadata_str: String = row.get("metadata");
1418            let type_str: String = row.get("type");
1419
1420            rooms.push(Room {
1421                id: uuid::Uuid::parse_str(&id_str)
1422                    .map_err(|e| ZoeyError::database(format!("Invalid UUID: {}", e)))?,
1423                agent_id: agent_id_str
1424                    .map(|s| uuid::Uuid::parse_str(&s).ok())
1425                    .flatten(),
1426                name: row.get("name"),
1427                source: row.get("source"),
1428                channel_type: serde_json::from_str(&format!("\"{}\"", type_str))
1429                    .unwrap_or(ChannelType::Unknown),
1430                channel_id: row.get("channel_id"),
1431                server_id: row.get("server_id"),
1432                world_id: uuid::Uuid::parse_str(&world_id_str)
1433                    .map_err(|e| ZoeyError::database(format!("Invalid UUID: {}", e)))?,
1434                metadata: serde_json::from_str(&metadata_str).unwrap_or_default(),
1435                created_at: row.get("created_at"),
1436            });
1437        }
1438
1439        Ok(rooms)
1440    }
1441
1442    async fn add_participant(&self, entity_id: UUID, room_id: UUID) -> Result<bool> {
1443        let result = sqlx::query(
1444            r#"
1445            INSERT INTO participants (entity_id, room_id, joined_at, metadata)
1446            VALUES (?, ?, ?, '{}')
1447            ON CONFLICT (entity_id, room_id) DO NOTHING
1448            "#,
1449        )
1450        .bind(entity_id.to_string())
1451        .bind(room_id.to_string())
1452        .bind(chrono::Utc::now().timestamp())
1453        .execute(&self.pool)
1454        .await?;
1455
1456        Ok(result.rows_affected() > 0)
1457    }
1458
1459    async fn remove_participant(&self, entity_id: UUID, room_id: UUID) -> Result<bool> {
1460        let result = sqlx::query("DELETE FROM participants WHERE entity_id = ? AND room_id = ?")
1461            .bind(entity_id.to_string())
1462            .bind(room_id.to_string())
1463            .execute(&self.pool)
1464            .await?;
1465
1466        Ok(result.rows_affected() > 0)
1467    }
1468
1469    async fn get_participants(&self, room_id: UUID) -> Result<Vec<Participant>> {
1470        let rows = sqlx::query(
1471            "SELECT entity_id, room_id, joined_at, metadata FROM participants WHERE room_id = ?",
1472        )
1473        .bind(room_id.to_string())
1474        .fetch_all(&self.pool)
1475        .await?;
1476
1477        let mut participants = Vec::new();
1478        for row in rows {
1479            let entity_id_str: String = row.get("entity_id");
1480            let room_id_str: String = row.get("room_id");
1481            let metadata_str: String = row.get("metadata");
1482
1483            participants.push(Participant {
1484                entity_id: uuid::Uuid::parse_str(&entity_id_str)
1485                    .map_err(|e| ZoeyError::database(format!("Invalid UUID: {}", e)))?,
1486                room_id: uuid::Uuid::parse_str(&room_id_str)
1487                    .map_err(|e| ZoeyError::database(format!("Invalid UUID: {}", e)))?,
1488                joined_at: row.get("joined_at"),
1489                metadata: serde_json::from_str(&metadata_str).unwrap_or_default(),
1490            });
1491        }
1492
1493        Ok(participants)
1494    }
1495
1496    async fn create_relationship(&self, relationship: &Relationship) -> Result<bool> {
1497        let result = sqlx::query(
1498            r#"
1499            INSERT INTO relationships (id, entity_id_a, entity_id_b, type, agent_id, metadata, created_at)
1500            VALUES (?, ?, ?, ?, ?, ?, ?)
1501            ON CONFLICT (entity_id_a, entity_id_b, type) DO UPDATE SET
1502                metadata = excluded.metadata
1503            "#,
1504        )
1505        .bind(uuid::Uuid::new_v4().to_string())
1506        .bind(relationship.entity_id_a.to_string())
1507        .bind(relationship.entity_id_b.to_string())
1508        .bind(&relationship.relationship_type)
1509        .bind(relationship.agent_id.to_string())
1510        .bind(serde_json::to_string(&relationship.metadata)?)
1511        .bind(relationship.created_at.unwrap_or_else(|| chrono::Utc::now().timestamp()))
1512        .execute(&self.pool)
1513        .await?;
1514
1515        Ok(result.rows_affected() > 0)
1516    }
1517
1518    async fn get_relationship(
1519        &self,
1520        entity_id_a: UUID,
1521        entity_id_b: UUID,
1522    ) -> Result<Option<Relationship>> {
1523        let row = sqlx::query(
1524            "SELECT entity_id_a, entity_id_b, type, agent_id, metadata, created_at FROM relationships WHERE entity_id_a = ? AND entity_id_b = ?",
1525        )
1526        .bind(entity_id_a.to_string())
1527        .bind(entity_id_b.to_string())
1528        .fetch_optional(&self.pool)
1529        .await?;
1530
1531        match row {
1532            Some(row) => {
1533                let entity_a_str: String = row.get("entity_id_a");
1534                let entity_b_str: String = row.get("entity_id_b");
1535                let agent_id_str: String = row.get("agent_id");
1536                let metadata_str: String = row.get("metadata");
1537
1538                Ok(Some(Relationship {
1539                    entity_id_a: uuid::Uuid::parse_str(&entity_a_str)
1540                        .map_err(|e| ZoeyError::database(format!("Invalid UUID: {}", e)))?,
1541                    entity_id_b: uuid::Uuid::parse_str(&entity_b_str)
1542                        .map_err(|e| ZoeyError::database(format!("Invalid UUID: {}", e)))?,
1543                    relationship_type: row.get("type"),
1544                    agent_id: uuid::Uuid::parse_str(&agent_id_str)
1545                        .map_err(|e| ZoeyError::database(format!("Invalid UUID: {}", e)))?,
1546                    metadata: serde_json::from_str(&metadata_str).unwrap_or_default(),
1547                    created_at: row.get("created_at"),
1548                }))
1549            }
1550            None => Ok(None),
1551        }
1552    }
1553
1554    async fn create_task(&self, task: &Task) -> Result<UUID> {
1555        let status_str = serde_json::to_string(&task.status)?
1556            .trim_matches('"')
1557            .to_string();
1558
1559        sqlx::query(
1560            r#"
1561            INSERT INTO tasks (id, agent_id, task_type, data, status, priority, scheduled_at, executed_at, retry_count, max_retries, error, created_at, updated_at)
1562            VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
1563            "#,
1564        )
1565        .bind(task.id.to_string())
1566        .bind(task.agent_id.to_string())
1567        .bind(&task.task_type)
1568        .bind(serde_json::to_string(&task.data)?)
1569        .bind(&status_str)
1570        .bind(task.priority)
1571        .bind(task.scheduled_at)
1572        .bind(task.executed_at)
1573        .bind(task.retry_count)
1574        .bind(task.max_retries)
1575        .bind(&task.error)
1576        .bind(task.created_at)
1577        .bind(task.updated_at)
1578        .execute(&self.pool)
1579        .await?;
1580
1581        Ok(task.id)
1582    }
1583
1584    async fn update_task(&self, task: &Task) -> Result<bool> {
1585        let status_str = serde_json::to_string(&task.status)?
1586            .trim_matches('"')
1587            .to_string();
1588
1589        let result = sqlx::query(
1590            r#"
1591            UPDATE tasks SET 
1592                status = ?, 
1593                priority = ?, 
1594                scheduled_at = ?, 
1595                executed_at = ?, 
1596                retry_count = ?, 
1597                error = ?, 
1598                updated_at = ?
1599            WHERE id = ?
1600            "#,
1601        )
1602        .bind(&status_str)
1603        .bind(task.priority)
1604        .bind(task.scheduled_at)
1605        .bind(task.executed_at)
1606        .bind(task.retry_count)
1607        .bind(&task.error)
1608        .bind(chrono::Utc::now().timestamp())
1609        .bind(task.id.to_string())
1610        .execute(&self.pool)
1611        .await?;
1612
1613        Ok(result.rows_affected() > 0)
1614    }
1615
1616    async fn get_task(&self, task_id: UUID) -> Result<Option<Task>> {
1617        let row = sqlx::query(
1618            "SELECT id, agent_id, task_type, data, status, priority, scheduled_at, executed_at, retry_count, max_retries, error, created_at, updated_at FROM tasks WHERE id = ?",
1619        )
1620        .bind(task_id.to_string())
1621        .fetch_optional(&self.pool)
1622        .await?;
1623
1624        match row {
1625            Some(row) => {
1626                let id_str: String = row.get("id");
1627                let agent_id_str: String = row.get("agent_id");
1628                let data_str: String = row.get("data");
1629                let status_str: String = row.get("status");
1630
1631                Ok(Some(Task {
1632                    id: uuid::Uuid::parse_str(&id_str)
1633                        .map_err(|e| ZoeyError::database(format!("Invalid UUID: {}", e)))?,
1634                    agent_id: uuid::Uuid::parse_str(&agent_id_str)
1635                        .map_err(|e| ZoeyError::database(format!("Invalid UUID: {}", e)))?,
1636                    task_type: row.get("task_type"),
1637                    data: serde_json::from_str(&data_str)?,
1638                    status: serde_json::from_str(&format!("\"{}\"", status_str))
1639                        .unwrap_or(TaskStatus::Pending),
1640                    priority: row.get("priority"),
1641                    scheduled_at: row.get("scheduled_at"),
1642                    executed_at: row.get("executed_at"),
1643                    retry_count: row.get("retry_count"),
1644                    max_retries: row.get("max_retries"),
1645                    error: row.get("error"),
1646                    created_at: row.get("created_at"),
1647                    updated_at: row.get("updated_at"),
1648                }))
1649            }
1650            None => Ok(None),
1651        }
1652    }
1653
1654    async fn get_pending_tasks(&self, agent_id: UUID) -> Result<Vec<Task>> {
1655        let rows = sqlx::query(
1656            r#"
1657            SELECT id, agent_id, task_type, data, status, priority, scheduled_at, executed_at, retry_count, max_retries, error, created_at, updated_at 
1658            FROM tasks 
1659            WHERE agent_id = ? AND status = 'PENDING'
1660            ORDER BY priority DESC, scheduled_at ASC
1661            "#,
1662        )
1663        .bind(agent_id.to_string())
1664        .fetch_all(&self.pool)
1665        .await?;
1666
1667        let mut tasks = Vec::new();
1668        for row in rows {
1669            let id_str: String = row.get("id");
1670            let agent_id_str: String = row.get("agent_id");
1671            let data_str: String = row.get("data");
1672            let status_str: String = row.get("status");
1673
1674            tasks.push(Task {
1675                id: uuid::Uuid::parse_str(&id_str)
1676                    .map_err(|e| ZoeyError::database(format!("Invalid UUID: {}", e)))?,
1677                agent_id: uuid::Uuid::parse_str(&agent_id_str)
1678                    .map_err(|e| ZoeyError::database(format!("Invalid UUID: {}", e)))?,
1679                task_type: row.get("task_type"),
1680                data: serde_json::from_str(&data_str)?,
1681                status: serde_json::from_str(&format!("\"{}\"", status_str))
1682                    .unwrap_or(TaskStatus::Pending),
1683                priority: row.get("priority"),
1684                scheduled_at: row.get("scheduled_at"),
1685                executed_at: row.get("executed_at"),
1686                retry_count: row.get("retry_count"),
1687                max_retries: row.get("max_retries"),
1688                error: row.get("error"),
1689                created_at: row.get("created_at"),
1690                updated_at: row.get("updated_at"),
1691            });
1692        }
1693
1694        Ok(tasks)
1695    }
1696
1697    async fn log(&self, log: &Log) -> Result<()> {
1698        let id = log.id.unwrap_or_else(uuid::Uuid::new_v4);
1699
1700        sqlx::query(
1701            r#"
1702            INSERT INTO logs (id, entity_id, room_id, body, type, created_at)
1703            VALUES (?, ?, ?, ?, ?, ?)
1704            "#,
1705        )
1706        .bind(id.to_string())
1707        .bind(log.entity_id.to_string())
1708        .bind(log.room_id.map(|id| id.to_string()))
1709        .bind(serde_json::to_string(&log.body)?)
1710        .bind(&log.log_type)
1711        .bind(log.created_at)
1712        .execute(&self.pool)
1713        .await?;
1714
1715        Ok(())
1716    }
1717
1718    async fn get_logs(&self, params: LogQuery) -> Result<Vec<Log>> {
1719        let mut query = String::from(
1720            "SELECT id, entity_id, room_id, body, type, created_at FROM logs WHERE 1=1",
1721        );
1722        let mut bindings: Vec<String> = Vec::new();
1723
1724        if let Some(entity_id) = params.entity_id {
1725            query.push_str(" AND entity_id = ?");
1726            bindings.push(entity_id.to_string());
1727        }
1728        if let Some(room_id) = params.room_id {
1729            query.push_str(" AND room_id = ?");
1730            bindings.push(room_id.to_string());
1731        }
1732        if let Some(log_type) = params.log_type {
1733            query.push_str(" AND type = ?");
1734            bindings.push(log_type);
1735        }
1736
1737        query.push_str(" ORDER BY created_at DESC");
1738
1739        if let Some(limit) = params.limit {
1740            query.push_str(&format!(" LIMIT {}", limit));
1741        }
1742        if let Some(offset) = params.offset {
1743            query.push_str(&format!(" OFFSET {}", offset));
1744        }
1745
1746        let mut query_builder = sqlx::query(&query);
1747        for binding in &bindings {
1748            query_builder = query_builder.bind(binding);
1749        }
1750
1751        let rows = query_builder.fetch_all(&self.pool).await?;
1752
1753        let mut logs = Vec::new();
1754        for row in rows {
1755            let id_str: String = row.get("id");
1756            let entity_id_str: String = row.get("entity_id");
1757            let room_id_str: Option<String> = row.get("room_id");
1758            let body_str: String = row.get("body");
1759
1760            logs.push(Log {
1761                id: Some(
1762                    uuid::Uuid::parse_str(&id_str)
1763                        .map_err(|e| ZoeyError::database(format!("Invalid UUID: {}", e)))?,
1764                ),
1765                entity_id: uuid::Uuid::parse_str(&entity_id_str)
1766                    .map_err(|e| ZoeyError::database(format!("Invalid UUID: {}", e)))?,
1767                room_id: room_id_str
1768                    .map(|s| uuid::Uuid::parse_str(&s).ok())
1769                    .flatten(),
1770                body: serde_json::from_str(&body_str)?,
1771                log_type: row.get("type"),
1772                created_at: row.get("created_at"),
1773            });
1774        }
1775
1776        Ok(logs)
1777    }
1778
1779    async fn get_agent_run_summaries(
1780        &self,
1781        _params: RunSummaryQuery,
1782    ) -> Result<AgentRunSummaryResult> {
1783        // Run summaries require more complex aggregation - return empty result for now
1784        Ok(AgentRunSummaryResult {
1785            runs: vec![],
1786            total: 0,
1787            has_more: false,
1788        })
1789    }
1790}
1791
1792#[cfg(test)]
1793mod tests {
1794    use super::*;
1795
1796    #[test]
1797    fn test_sqlite_adapter_creation() {
1798        // This is a compilation test
1799        assert!(true);
1800    }
1801}