1use async_trait::async_trait;
7use zoey_core::observability::types::LLMCostRecord;
8use zoey_core::{types::*, ZoeyError, Result};
9use sqlx::sqlite::SqliteConnectOptions;
10use sqlx::{Row, SqlitePool};
11use std::str::FromStr;
12use tracing::{debug, info};
13
14macro_rules! not_implemented {
16 ($method:expr) => {
17 Err(ZoeyError::database(format!(
18 "SQLite: {} not implemented. Use PostgreSQL for full functionality",
19 $method
20 )))
21 };
22}
23
24pub struct SqliteAdapter {
26 pool: SqlitePool,
27}
28
29impl SqliteAdapter {
30 pub async fn new(database_path: &str) -> Result<Self> {
32 info!("Opening SQLite database at: {}", database_path);
33
34 let opts = SqliteConnectOptions::from_str(database_path)
35 .map_err(|e| ZoeyError::database(format!("Invalid SQLite URL: {}", e)))?
36 .create_if_missing(true);
37
38 let pool = SqlitePool::connect_with(opts)
39 .await
40 .map_err(|e| ZoeyError::DatabaseSqlx(e))?;
41
42 Ok(Self { pool })
43 }
44
45 async fn init_schema(&self) -> Result<()> {
47 debug!("Initializing SQLite schema...");
48
49 sqlx::query("PRAGMA foreign_keys = ON")
51 .execute(&self.pool)
52 .await?;
53
54 sqlx::query(
60 r#"
61 CREATE TABLE IF NOT EXISTS agents (
62 id TEXT PRIMARY KEY,
63 name TEXT NOT NULL,
64 character TEXT NOT NULL,
65 created_at INTEGER NOT NULL DEFAULT (strftime('%s', 'now')),
66 updated_at INTEGER
67 )
68 "#,
69 )
70 .execute(&self.pool)
71 .await?;
72
73 sqlx::query(
75 r#"
76 CREATE TABLE IF NOT EXISTS entities (
77 id TEXT PRIMARY KEY,
78 agent_id TEXT NOT NULL REFERENCES agents(id) ON DELETE CASCADE,
79 name TEXT,
80 username TEXT,
81 email TEXT,
82 avatar_url TEXT,
83 metadata TEXT DEFAULT '{}',
84 created_at INTEGER NOT NULL DEFAULT (strftime('%s', 'now'))
85 )
86 "#,
87 )
88 .execute(&self.pool)
89 .await?;
90
91 sqlx::query(
93 r#"
94 CREATE TABLE IF NOT EXISTS worlds (
95 id TEXT PRIMARY KEY,
96 name TEXT NOT NULL,
97 agent_id TEXT NOT NULL REFERENCES agents(id) ON DELETE CASCADE,
98 server_id TEXT,
99 metadata TEXT DEFAULT '{}',
100 created_at INTEGER NOT NULL DEFAULT (strftime('%s', 'now'))
101 )
102 "#,
103 )
104 .execute(&self.pool)
105 .await?;
106
107 sqlx::query(
109 r#"
110 CREATE TABLE IF NOT EXISTS rooms (
111 id TEXT PRIMARY KEY,
112 agent_id TEXT REFERENCES agents(id) ON DELETE SET NULL,
113 name TEXT NOT NULL,
114 source TEXT NOT NULL,
115 type TEXT NOT NULL,
116 channel_id TEXT,
117 server_id TEXT,
118 world_id TEXT NOT NULL REFERENCES worlds(id) ON DELETE CASCADE,
119 metadata TEXT DEFAULT '{}',
120 created_at INTEGER NOT NULL DEFAULT (strftime('%s', 'now'))
121 )
122 "#,
123 )
124 .execute(&self.pool)
125 .await?;
126
127 sqlx::query(
129 r#"
130 CREATE TABLE IF NOT EXISTS memories (
131 id TEXT PRIMARY KEY,
132 entity_id TEXT NOT NULL REFERENCES entities(id) ON DELETE CASCADE,
133 agent_id TEXT NOT NULL REFERENCES agents(id) ON DELETE CASCADE,
134 room_id TEXT NOT NULL REFERENCES rooms(id) ON DELETE CASCADE,
135 content TEXT NOT NULL,
136 metadata TEXT,
137 created_at INTEGER NOT NULL DEFAULT (strftime('%s', 'now')),
138 unique_flag INTEGER DEFAULT 0
139 )
140 "#,
141 )
142 .execute(&self.pool)
143 .await?;
144
145 sqlx::query(
147 r#"
148 CREATE TABLE IF NOT EXISTS participants (
149 entity_id TEXT NOT NULL REFERENCES entities(id) ON DELETE CASCADE,
150 room_id TEXT NOT NULL REFERENCES rooms(id) ON DELETE CASCADE,
151 joined_at INTEGER NOT NULL DEFAULT (strftime('%s', 'now')),
152 metadata TEXT DEFAULT '{}',
153 PRIMARY KEY (entity_id, room_id)
154 )
155 "#,
156 )
157 .execute(&self.pool)
158 .await?;
159
160 sqlx::query(
162 r#"
163 CREATE TABLE IF NOT EXISTS relationships (
164 id TEXT PRIMARY KEY,
165 entity_id_a TEXT NOT NULL REFERENCES entities(id) ON DELETE CASCADE,
166 entity_id_b TEXT NOT NULL REFERENCES entities(id) ON DELETE CASCADE,
167 type TEXT NOT NULL,
168 agent_id TEXT NOT NULL REFERENCES agents(id) ON DELETE CASCADE,
169 metadata TEXT DEFAULT '{}',
170 created_at INTEGER NOT NULL DEFAULT (strftime('%s', 'now')),
171 UNIQUE (entity_id_a, entity_id_b, type)
172 )
173 "#,
174 )
175 .execute(&self.pool)
176 .await?;
177
178 sqlx::query(
180 r#"
181 CREATE TABLE IF NOT EXISTS components (
182 id TEXT PRIMARY KEY,
183 entity_id TEXT NOT NULL REFERENCES entities(id) ON DELETE CASCADE,
184 world_id TEXT NOT NULL REFERENCES worlds(id) ON DELETE CASCADE,
185 source_entity_id TEXT REFERENCES entities(id) ON DELETE SET NULL,
186 type TEXT NOT NULL,
187 data TEXT NOT NULL,
188 created_at INTEGER NOT NULL DEFAULT (strftime('%s', 'now')),
189 updated_at INTEGER,
190 UNIQUE (entity_id, world_id, type, source_entity_id)
191 )
192 "#,
193 )
194 .execute(&self.pool)
195 .await?;
196
197 sqlx::query(
199 r#"
200 CREATE TABLE IF NOT EXISTS tasks (
201 id TEXT PRIMARY KEY,
202 agent_id TEXT NOT NULL REFERENCES agents(id) ON DELETE CASCADE,
203 task_type TEXT NOT NULL,
204 data TEXT NOT NULL,
205 status TEXT NOT NULL DEFAULT 'PENDING',
206 priority INTEGER DEFAULT 0,
207 scheduled_at INTEGER,
208 executed_at INTEGER,
209 retry_count INTEGER DEFAULT 0,
210 max_retries INTEGER DEFAULT 3,
211 error TEXT,
212 created_at INTEGER NOT NULL DEFAULT (strftime('%s', 'now')),
213 updated_at INTEGER
214 )
215 "#,
216 )
217 .execute(&self.pool)
218 .await?;
219
220 sqlx::query(
222 r#"
223 CREATE TABLE IF NOT EXISTS logs (
224 id TEXT PRIMARY KEY,
225 entity_id TEXT NOT NULL REFERENCES entities(id) ON DELETE CASCADE,
226 room_id TEXT REFERENCES rooms(id) ON DELETE SET NULL,
227 body TEXT NOT NULL,
228 type TEXT NOT NULL,
229 created_at INTEGER NOT NULL DEFAULT (strftime('%s', 'now'))
230 )
231 "#,
232 )
233 .execute(&self.pool)
234 .await?;
235
236 sqlx::query("CREATE INDEX IF NOT EXISTS idx_entities_agent_id ON entities(agent_id)")
242 .execute(&self.pool)
243 .await?;
244
245 sqlx::query("CREATE INDEX IF NOT EXISTS idx_entities_username ON entities(username)")
246 .execute(&self.pool)
247 .await?;
248
249 sqlx::query("CREATE INDEX IF NOT EXISTS idx_worlds_agent_id ON worlds(agent_id)")
251 .execute(&self.pool)
252 .await?;
253
254 sqlx::query("CREATE INDEX IF NOT EXISTS idx_worlds_server_id ON worlds(server_id)")
255 .execute(&self.pool)
256 .await?;
257
258 sqlx::query("CREATE INDEX IF NOT EXISTS idx_rooms_world_id ON rooms(world_id)")
260 .execute(&self.pool)
261 .await?;
262
263 sqlx::query("CREATE INDEX IF NOT EXISTS idx_rooms_agent_id ON rooms(agent_id)")
264 .execute(&self.pool)
265 .await?;
266
267 sqlx::query("CREATE INDEX IF NOT EXISTS idx_rooms_channel_id ON rooms(channel_id)")
268 .execute(&self.pool)
269 .await?;
270
271 sqlx::query(
272 "CREATE INDEX IF NOT EXISTS idx_rooms_source_server ON rooms(source, server_id)",
273 )
274 .execute(&self.pool)
275 .await?;
276
277 sqlx::query("CREATE INDEX IF NOT EXISTS idx_rooms_type ON rooms(type)")
278 .execute(&self.pool)
279 .await?;
280
281 sqlx::query("CREATE INDEX IF NOT EXISTS idx_memories_agent_id ON memories(agent_id)")
283 .execute(&self.pool)
284 .await?;
285
286 sqlx::query("CREATE INDEX IF NOT EXISTS idx_memories_room_id ON memories(room_id)")
287 .execute(&self.pool)
288 .await?;
289
290 sqlx::query("CREATE INDEX IF NOT EXISTS idx_memories_entity_id ON memories(entity_id)")
291 .execute(&self.pool)
292 .await?;
293
294 sqlx::query(
295 "CREATE INDEX IF NOT EXISTS idx_memories_created_at ON memories(created_at DESC)",
296 )
297 .execute(&self.pool)
298 .await?;
299
300 sqlx::query("CREATE INDEX IF NOT EXISTS idx_memories_agent_room_created ON memories(agent_id, room_id, created_at DESC)")
301 .execute(&self.pool)
302 .await?;
303
304 sqlx::query(
306 "CREATE INDEX IF NOT EXISTS idx_participants_entity_id ON participants(entity_id)",
307 )
308 .execute(&self.pool)
309 .await?;
310
311 sqlx::query("CREATE INDEX IF NOT EXISTS idx_participants_room_id ON participants(room_id)")
312 .execute(&self.pool)
313 .await?;
314
315 sqlx::query(
317 "CREATE INDEX IF NOT EXISTS idx_relationships_entity_a ON relationships(entity_id_a)",
318 )
319 .execute(&self.pool)
320 .await?;
321
322 sqlx::query(
323 "CREATE INDEX IF NOT EXISTS idx_relationships_entity_b ON relationships(entity_id_b)",
324 )
325 .execute(&self.pool)
326 .await?;
327
328 sqlx::query(
329 "CREATE INDEX IF NOT EXISTS idx_relationships_agent_id ON relationships(agent_id)",
330 )
331 .execute(&self.pool)
332 .await?;
333
334 sqlx::query("CREATE INDEX IF NOT EXISTS idx_relationships_type ON relationships(type)")
335 .execute(&self.pool)
336 .await?;
337
338 sqlx::query("CREATE INDEX IF NOT EXISTS idx_components_entity_id ON components(entity_id)")
340 .execute(&self.pool)
341 .await?;
342
343 sqlx::query("CREATE INDEX IF NOT EXISTS idx_components_world_id ON components(world_id)")
344 .execute(&self.pool)
345 .await?;
346
347 sqlx::query("CREATE INDEX IF NOT EXISTS idx_components_type ON components(type)")
348 .execute(&self.pool)
349 .await?;
350
351 sqlx::query(
352 "CREATE INDEX IF NOT EXISTS idx_components_entity_type ON components(entity_id, type)",
353 )
354 .execute(&self.pool)
355 .await?;
356
357 sqlx::query("CREATE INDEX IF NOT EXISTS idx_tasks_agent_id ON tasks(agent_id)")
359 .execute(&self.pool)
360 .await?;
361
362 sqlx::query("CREATE INDEX IF NOT EXISTS idx_tasks_status ON tasks(status)")
363 .execute(&self.pool)
364 .await?;
365
366 sqlx::query(
367 "CREATE INDEX IF NOT EXISTS idx_tasks_status_scheduled ON tasks(status, scheduled_at)",
368 )
369 .execute(&self.pool)
370 .await?;
371
372 sqlx::query("CREATE INDEX IF NOT EXISTS idx_tasks_agent_status ON tasks(agent_id, status)")
373 .execute(&self.pool)
374 .await?;
375
376 sqlx::query("CREATE INDEX IF NOT EXISTS idx_logs_entity_id ON logs(entity_id)")
378 .execute(&self.pool)
379 .await?;
380
381 sqlx::query("CREATE INDEX IF NOT EXISTS idx_logs_room_id ON logs(room_id)")
382 .execute(&self.pool)
383 .await?;
384
385 sqlx::query("CREATE INDEX IF NOT EXISTS idx_logs_type ON logs(type)")
386 .execute(&self.pool)
387 .await?;
388
389 sqlx::query("CREATE INDEX IF NOT EXISTS idx_logs_created_at ON logs(created_at DESC)")
390 .execute(&self.pool)
391 .await?;
392
393 sqlx::query("CREATE INDEX IF NOT EXISTS idx_logs_entity_created ON logs(entity_id, created_at DESC)")
394 .execute(&self.pool)
395 .await?;
396
397 sqlx::query(
402 r#"
403 CREATE TABLE IF NOT EXISTS llm_costs (
404 id TEXT PRIMARY KEY,
405 timestamp INTEGER NOT NULL,
406
407 agent_id TEXT NOT NULL,
408 user_id TEXT,
409 conversation_id TEXT,
410 action_name TEXT,
411 evaluator_name TEXT,
412
413 provider TEXT NOT NULL,
414 model TEXT NOT NULL,
415 temperature REAL NOT NULL,
416
417 prompt_tokens INTEGER NOT NULL,
418 completion_tokens INTEGER NOT NULL,
419 total_tokens INTEGER NOT NULL,
420 cached_tokens INTEGER,
421
422 input_cost_usd REAL NOT NULL,
423 output_cost_usd REAL NOT NULL,
424 total_cost_usd REAL NOT NULL,
425
426 latency_ms INTEGER NOT NULL,
427 ttft_ms INTEGER,
428
429 success INTEGER NOT NULL,
430 error TEXT,
431
432 prompt_hash TEXT,
433 prompt_preview TEXT
434 )
435 "#,
436 )
437 .execute(&self.pool)
438 .await?;
439
440 sqlx::query(
441 r#"
442 CREATE TABLE IF NOT EXISTS stored_prompts (
443 id TEXT PRIMARY KEY,
444 timestamp INTEGER NOT NULL,
445 cost_record_id TEXT REFERENCES llm_costs(id) ON DELETE CASCADE,
446 agent_id TEXT NOT NULL,
447 conversation_id TEXT,
448 prompt_hash TEXT NOT NULL,
449 prompt_text TEXT,
450 prompt_length INTEGER NOT NULL,
451 sanitized INTEGER NOT NULL,
452 sanitization_level TEXT NOT NULL,
453 completion_text TEXT,
454 completion_length INTEGER NOT NULL DEFAULT 0,
455 model TEXT NOT NULL,
456 temperature REAL NOT NULL
457 )
458 "#,
459 )
460 .execute(&self.pool)
461 .await?;
462
463 sqlx::query("CREATE INDEX IF NOT EXISTS idx_llm_costs_agent_timestamp ON llm_costs(agent_id, timestamp DESC)")
465 .execute(&self.pool)
466 .await?;
467
468 sqlx::query(
469 "CREATE INDEX IF NOT EXISTS idx_llm_costs_provider_model ON llm_costs(provider, model)",
470 )
471 .execute(&self.pool)
472 .await?;
473
474 sqlx::query(
475 "CREATE INDEX IF NOT EXISTS idx_llm_costs_timestamp ON llm_costs(timestamp DESC)",
476 )
477 .execute(&self.pool)
478 .await?;
479
480 sqlx::query(
481 "CREATE INDEX IF NOT EXISTS idx_llm_costs_conversation ON llm_costs(conversation_id)",
482 )
483 .execute(&self.pool)
484 .await?;
485
486 sqlx::query("CREATE INDEX IF NOT EXISTS idx_stored_prompts_agent_timestamp ON stored_prompts(agent_id, timestamp DESC)")
487 .execute(&self.pool)
488 .await?;
489
490 sqlx::query(
491 "CREATE INDEX IF NOT EXISTS idx_stored_prompts_hash ON stored_prompts(prompt_hash)",
492 )
493 .execute(&self.pool)
494 .await?;
495
496 sqlx::query("CREATE INDEX IF NOT EXISTS idx_stored_prompts_cost_record ON stored_prompts(cost_record_id)")
497 .execute(&self.pool)
498 .await?;
499
500 info!("SQLite schema initialized successfully");
501 Ok(())
502 }
503}
504
505#[async_trait]
506impl IDatabaseAdapter for SqliteAdapter {
507 fn db(&self) -> &dyn std::any::Any {
508 &self.pool
509 }
510
511 async fn initialize(&mut self, _config: Option<serde_json::Value>) -> Result<()> {
512 self.init_schema().await
513 }
514
515 async fn is_ready(&self) -> Result<bool> {
516 match sqlx::query("SELECT 1").fetch_one(&self.pool).await {
517 Ok(_) => Ok(true),
518 Err(_) => Ok(false),
519 }
520 }
521
522 async fn close(&mut self) -> Result<()> {
523 self.pool.close().await;
524 Ok(())
525 }
526
527 async fn get_connection(&self) -> Result<Box<dyn std::any::Any + Send>> {
528 Ok(Box::new(self.pool.clone()))
529 }
530
531 async fn run_plugin_migrations(
532 &self,
533 plugins: Vec<PluginMigration>,
534 options: MigrationOptions,
535 ) -> Result<()> {
536 if plugins.is_empty() {
537 return Ok(());
538 }
539
540 fn validate_identifier(name: &str) -> Result<&str> {
542 if name.len() > 64 {
543 return Err(ZoeyError::validation(format!(
544 "Identifier too long: {} (max 64 characters)",
545 name.len()
546 )));
547 }
548 if !name.chars().all(|c| c.is_ascii_alphanumeric() || c == '_') {
549 return Err(ZoeyError::validation(format!(
550 "Invalid identifier '{}': only alphanumeric and underscore allowed",
551 name
552 )));
553 }
554 Ok(name)
555 }
556
557 for plugin in plugins {
558 if let Some(schema) = plugin.schema {
559 if options.verbose {
560 tracing::info!("Applying schema for plugin '{}' (SQLite)", plugin.name);
561 }
562
563 let schema_obj = schema.as_object().ok_or_else(|| {
564 ZoeyError::validation(format!(
565 "Invalid schema for plugin '{}': expected JSON object",
566 plugin.name
567 ))
568 })?;
569
570 for (table_name, table_def) in schema_obj.iter() {
571 let table_name = validate_identifier(table_name)?;
572
573 let columns_obj = table_def
574 .get("columns")
575 .and_then(|v| v.as_object())
576 .ok_or_else(|| {
577 ZoeyError::validation(format!(
578 "Invalid table definition for '{}': missing 'columns'",
579 table_name
580 ))
581 })?;
582
583 let mut cols_sql: Vec<String> = Vec::new();
584 for (col_name, col_type_val) in columns_obj.iter() {
585 let col_name = validate_identifier(col_name)?;
586 let mut col_type = col_type_val
587 .as_str()
588 .ok_or_else(|| {
589 ZoeyError::validation(format!(
590 "Invalid type for column '{}.{}'",
591 table_name, col_name
592 ))
593 })?
594 .to_string();
595
596 col_type = col_type
598 .replace("UUID", "TEXT")
599 .replace("TIMESTAMP", "INTEGER")
600 .replace("JSONB", "TEXT")
601 .replace("BOOLEAN", "INTEGER")
602 .replace("TEXT", "TEXT");
603
604 cols_sql.push(format!("{} {}", col_name, col_type));
605 }
606
607 let create_sql = format!(
608 "CREATE TABLE IF NOT EXISTS {} ({})",
609 table_name,
610 cols_sql.join(", ")
611 );
612
613 if options.verbose {
614 tracing::debug!("{}", create_sql);
615 }
616 if !options.dry_run {
617 sqlx::query(&create_sql).execute(&self.pool).await?;
618 }
619 }
620
621 if options.verbose {
622 tracing::info!("✓ Schema applied for plugin '{}' (SQLite)", plugin.name);
623 }
624 }
625 }
626 Ok(())
627 }
628
629 async fn persist_llm_cost(&self, record: LLMCostRecord) -> zoey_core::Result<()> {
630 sqlx::query(
631 r#"
632 INSERT INTO llm_costs (
633 id, timestamp, agent_id, user_id, conversation_id, action_name, evaluator_name,
634 provider, model, temperature, prompt_tokens, completion_tokens, total_tokens, cached_tokens,
635 input_cost_usd, output_cost_usd, total_cost_usd, latency_ms, ttft_ms,
636 success, error, prompt_hash, prompt_preview
637 ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
638 "#
639 )
640 .bind(record.id.to_string())
641 .bind(record.timestamp.timestamp())
642 .bind(record.agent_id.to_string())
643 .bind(record.user_id)
644 .bind(record.conversation_id.map(|v| v.to_string()))
645 .bind(record.action_name)
646 .bind(record.evaluator_name)
647 .bind(record.provider)
648 .bind(record.model)
649 .bind(record.temperature)
650 .bind(record.prompt_tokens as i64)
651 .bind(record.completion_tokens as i64)
652 .bind(record.total_tokens as i64)
653 .bind(record.cached_tokens.map(|v| v as i64))
654 .bind(record.input_cost_usd)
655 .bind(record.output_cost_usd)
656 .bind(record.total_cost_usd)
657 .bind(record.latency_ms as i64)
658 .bind(record.ttft_ms.map(|v| v as i64))
659 .bind(if record.success { 1 } else { 0 })
660 .bind(record.error)
661 .bind(record.prompt_hash)
662 .bind(record.prompt_preview)
663 .execute(&self.pool)
664 .await?;
665
666 Ok(())
667 }
668
669 async fn get_agent(&self, agent_id: UUID) -> Result<Option<Agent>> {
672 let row = sqlx::query(
673 "SELECT id, name, character, created_at, updated_at FROM agents WHERE id = ?",
674 )
675 .bind(agent_id.to_string())
676 .fetch_optional(&self.pool)
677 .await?;
678
679 match row {
680 Some(row) => {
681 let id_str: String = row.get("id");
682 let character_str: String = row.get("character");
683
684 Ok(Some(Agent {
685 id: uuid::Uuid::parse_str(&id_str)
686 .map_err(|e| ZoeyError::database(format!("Invalid UUID: {}", e)))?,
687 name: row.get("name"),
688 character: serde_json::from_str(&character_str)?,
689 created_at: row.get("created_at"),
690 updated_at: row.get("updated_at"),
691 }))
692 }
693 None => Ok(None),
694 }
695 }
696
697 async fn get_agents(&self) -> Result<Vec<Agent>> {
698 let rows = sqlx::query("SELECT id, name, character, created_at, updated_at FROM agents")
699 .fetch_all(&self.pool)
700 .await?;
701
702 let mut agents = Vec::new();
703 for row in rows {
704 let id_str: String = row.get("id");
705 let character_str: String = row.get("character");
706
707 agents.push(Agent {
708 id: uuid::Uuid::parse_str(&id_str)
709 .map_err(|e| ZoeyError::database(format!("Invalid UUID: {}", e)))?,
710 name: row.get("name"),
711 character: serde_json::from_str(&character_str)?,
712 created_at: row.get("created_at"),
713 updated_at: row.get("updated_at"),
714 });
715 }
716
717 Ok(agents)
718 }
719
720 async fn create_agent(&self, agent: &Agent) -> Result<bool> {
721 let result = sqlx::query(
722 "INSERT INTO agents (id, name, character, created_at, updated_at) VALUES (?, ?, ?, ?, ?)"
723 )
724 .bind(agent.id.to_string())
725 .bind(&agent.name)
726 .bind(serde_json::to_string(&agent.character)?)
727 .bind(agent.created_at)
728 .bind(agent.updated_at)
729 .execute(&self.pool)
730 .await?;
731
732 Ok(result.rows_affected() > 0)
733 }
734
735 async fn update_agent(&self, agent_id: UUID, agent: &Agent) -> Result<bool> {
736 let result =
737 sqlx::query("UPDATE agents SET name = ?, character = ?, updated_at = ? WHERE id = ?")
738 .bind(&agent.name)
739 .bind(serde_json::to_string(&agent.character)?)
740 .bind(chrono::Utc::now().timestamp())
741 .bind(agent_id.to_string())
742 .execute(&self.pool)
743 .await?;
744
745 Ok(result.rows_affected() > 0)
746 }
747
748 async fn delete_agent(&self, agent_id: UUID) -> Result<bool> {
749 let result = sqlx::query("DELETE FROM agents WHERE id = ?")
750 .bind(agent_id.to_string())
751 .execute(&self.pool)
752 .await?;
753
754 Ok(result.rows_affected() > 0)
755 }
756
757 async fn ensure_embedding_dimension(&self, _dimension: usize) -> Result<()> {
758 Ok(())
761 }
762
763 async fn get_entities_by_ids(&self, entity_ids: Vec<UUID>) -> Result<Vec<Entity>> {
764 if entity_ids.is_empty() {
765 return Ok(vec![]);
766 }
767
768 let placeholders = entity_ids
769 .iter()
770 .map(|_| "?")
771 .collect::<Vec<_>>()
772 .join(", ");
773 let query = format!("SELECT id, agent_id, name, username, email, avatar_url, metadata, created_at FROM entities WHERE id IN ({})", placeholders);
774
775 let mut query_builder = sqlx::query(&query);
776 for id in &entity_ids {
777 query_builder = query_builder.bind(id.to_string());
778 }
779
780 let rows = query_builder.fetch_all(&self.pool).await?;
781
782 let mut entities = Vec::new();
783 for row in rows {
784 let id_str: String = row.get("id");
785 let agent_id_str: String = row.get("agent_id");
786 let metadata_str: String = row.get("metadata");
787
788 entities.push(Entity {
789 id: uuid::Uuid::parse_str(&id_str)
790 .map_err(|e| ZoeyError::database(format!("Invalid UUID: {}", e)))?,
791 agent_id: uuid::Uuid::parse_str(&agent_id_str)
792 .map_err(|e| ZoeyError::database(format!("Invalid UUID: {}", e)))?,
793 name: row.get("name"),
794 username: row.get("username"),
795 email: row.get("email"),
796 avatar_url: row.get("avatar_url"),
797 metadata: serde_json::from_str(&metadata_str).unwrap_or_default(),
798 created_at: row.get("created_at"),
799 });
800 }
801
802 Ok(entities)
803 }
804
805 async fn get_entities_for_room(
806 &self,
807 room_id: UUID,
808 _include_components: bool,
809 ) -> Result<Vec<Entity>> {
810 let rows = sqlx::query(
811 r#"
812 SELECT e.id, e.agent_id, e.name, e.username, e.email, e.avatar_url, e.metadata, e.created_at
813 FROM entities e
814 INNER JOIN participants p ON e.id = p.entity_id
815 WHERE p.room_id = ?
816 "#,
817 )
818 .bind(room_id.to_string())
819 .fetch_all(&self.pool)
820 .await?;
821
822 let mut entities = Vec::new();
823 for row in rows {
824 let id_str: String = row.get("id");
825 let agent_id_str: String = row.get("agent_id");
826 let metadata_str: String = row.get("metadata");
827
828 entities.push(Entity {
829 id: uuid::Uuid::parse_str(&id_str)
830 .map_err(|e| ZoeyError::database(format!("Invalid UUID: {}", e)))?,
831 agent_id: uuid::Uuid::parse_str(&agent_id_str)
832 .map_err(|e| ZoeyError::database(format!("Invalid UUID: {}", e)))?,
833 name: row.get("name"),
834 username: row.get("username"),
835 email: row.get("email"),
836 avatar_url: row.get("avatar_url"),
837 metadata: serde_json::from_str(&metadata_str).unwrap_or_default(),
838 created_at: row.get("created_at"),
839 });
840 }
841
842 Ok(entities)
843 }
844
845 async fn create_entities(&self, entities: Vec<Entity>) -> Result<bool> {
846 for entity in entities {
847 sqlx::query(
848 "INSERT OR REPLACE INTO entities (id, agent_id, name, username, email, avatar_url, metadata, created_at)
849 VALUES (?, ?, ?, ?, ?, ?, ?, ?)"
850 )
851 .bind(entity.id.to_string())
852 .bind(entity.agent_id.to_string())
853 .bind(&entity.name)
854 .bind(&entity.username)
855 .bind(&entity.email)
856 .bind(&entity.avatar_url)
857 .bind(serde_json::to_string(&entity.metadata)?)
858 .bind(entity.created_at)
859 .execute(&self.pool)
860 .await?;
861 }
862 Ok(true)
863 }
864
865 async fn update_entity(&self, entity: &Entity) -> Result<()> {
866 sqlx::query(
867 "UPDATE entities SET name = ?, username = ?, email = ?, avatar_url = ?, metadata = ? WHERE id = ?"
868 )
869 .bind(&entity.name)
870 .bind(&entity.username)
871 .bind(&entity.email)
872 .bind(&entity.avatar_url)
873 .bind(serde_json::to_string(&entity.metadata)?)
874 .bind(entity.id.to_string())
875 .execute(&self.pool)
876 .await?;
877
878 Ok(())
879 }
880
881 async fn get_entity_by_id(&self, entity_id: UUID) -> Result<Option<Entity>> {
882 let row = sqlx::query(
883 "SELECT id, agent_id, name, username, email, avatar_url, metadata, created_at FROM entities WHERE id = ?"
884 )
885 .bind(entity_id.to_string())
886 .fetch_optional(&self.pool)
887 .await?;
888
889 match row {
890 Some(row) => {
891 let id_str: String = row.get("id");
892 let agent_id_str: String = row.get("agent_id");
893 let metadata_str: String = row.get("metadata");
894
895 Ok(Some(Entity {
896 id: uuid::Uuid::parse_str(&id_str)
897 .map_err(|e| ZoeyError::database(format!("Invalid UUID: {}", e)))?,
898 agent_id: uuid::Uuid::parse_str(&agent_id_str)
899 .map_err(|e| ZoeyError::database(format!("Invalid UUID: {}", e)))?,
900 name: row.get("name"),
901 username: row.get("username"),
902 email: row.get("email"),
903 avatar_url: row.get("avatar_url"),
904 metadata: serde_json::from_str(&metadata_str).unwrap_or_default(),
905 created_at: row.get("created_at"),
906 }))
907 }
908 None => Ok(None),
909 }
910 }
911
912 async fn get_component(
913 &self,
914 entity_id: UUID,
915 component_type: &str,
916 world_id: Option<UUID>,
917 source_entity_id: Option<UUID>,
918 ) -> Result<Option<Component>> {
919 let mut query = String::from(
920 "SELECT id, entity_id, world_id, source_entity_id, type, data, created_at, updated_at FROM components WHERE entity_id = ? AND type = ?",
921 );
922 let mut bindings: Vec<String> = vec![entity_id.to_string(), component_type.to_string()];
923
924 if let Some(wid) = world_id {
925 query.push_str(" AND world_id = ?");
926 bindings.push(wid.to_string());
927 }
928 if let Some(sid) = source_entity_id {
929 query.push_str(" AND source_entity_id = ?");
930 bindings.push(sid.to_string());
931 } else {
932 query.push_str(" AND source_entity_id IS NULL");
933 }
934
935 let mut query_builder = sqlx::query(&query);
936 for binding in &bindings {
937 query_builder = query_builder.bind(binding);
938 }
939
940 let row = query_builder.fetch_optional(&self.pool).await?;
941
942 match row {
943 Some(row) => {
944 let id_str: String = row.get("id");
945 let entity_id_str: String = row.get("entity_id");
946 let world_id_str: String = row.get("world_id");
947 let source_entity_id_str: Option<String> = row.get("source_entity_id");
948 let data_str: String = row.get("data");
949
950 Ok(Some(Component {
951 id: uuid::Uuid::parse_str(&id_str)
952 .map_err(|e| ZoeyError::database(format!("Invalid UUID: {}", e)))?,
953 entity_id: uuid::Uuid::parse_str(&entity_id_str)
954 .map_err(|e| ZoeyError::database(format!("Invalid UUID: {}", e)))?,
955 world_id: uuid::Uuid::parse_str(&world_id_str)
956 .map_err(|e| ZoeyError::database(format!("Invalid UUID: {}", e)))?,
957 source_entity_id: source_entity_id_str
958 .map(|s| uuid::Uuid::parse_str(&s).ok())
959 .flatten(),
960 component_type: row.get("type"),
961 data: serde_json::from_str(&data_str)?,
962 created_at: row.get("created_at"),
963 updated_at: row.get("updated_at"),
964 }))
965 }
966 None => Ok(None),
967 }
968 }
969
970 async fn get_components(
971 &self,
972 entity_id: UUID,
973 world_id: Option<UUID>,
974 source_entity_id: Option<UUID>,
975 ) -> Result<Vec<Component>> {
976 let mut query = String::from(
977 "SELECT id, entity_id, world_id, source_entity_id, type, data, created_at, updated_at FROM components WHERE entity_id = ?",
978 );
979 let mut bindings: Vec<String> = vec![entity_id.to_string()];
980
981 if let Some(wid) = world_id {
982 query.push_str(" AND world_id = ?");
983 bindings.push(wid.to_string());
984 }
985 if let Some(sid) = source_entity_id {
986 query.push_str(" AND source_entity_id = ?");
987 bindings.push(sid.to_string());
988 }
989
990 let mut query_builder = sqlx::query(&query);
991 for binding in &bindings {
992 query_builder = query_builder.bind(binding);
993 }
994
995 let rows = query_builder.fetch_all(&self.pool).await?;
996
997 let mut components = Vec::new();
998 for row in rows {
999 let id_str: String = row.get("id");
1000 let entity_id_str: String = row.get("entity_id");
1001 let world_id_str: String = row.get("world_id");
1002 let source_entity_id_str: Option<String> = row.get("source_entity_id");
1003 let data_str: String = row.get("data");
1004
1005 components.push(Component {
1006 id: uuid::Uuid::parse_str(&id_str)
1007 .map_err(|e| ZoeyError::database(format!("Invalid UUID: {}", e)))?,
1008 entity_id: uuid::Uuid::parse_str(&entity_id_str)
1009 .map_err(|e| ZoeyError::database(format!("Invalid UUID: {}", e)))?,
1010 world_id: uuid::Uuid::parse_str(&world_id_str)
1011 .map_err(|e| ZoeyError::database(format!("Invalid UUID: {}", e)))?,
1012 source_entity_id: source_entity_id_str
1013 .map(|s| uuid::Uuid::parse_str(&s).ok())
1014 .flatten(),
1015 component_type: row.get("type"),
1016 data: serde_json::from_str(&data_str)?,
1017 created_at: row.get("created_at"),
1018 updated_at: row.get("updated_at"),
1019 });
1020 }
1021
1022 Ok(components)
1023 }
1024
1025 async fn create_component(&self, component: &Component) -> Result<bool> {
1026 let result = sqlx::query(
1027 r#"
1028 INSERT INTO components (id, entity_id, world_id, source_entity_id, type, data, created_at, updated_at)
1029 VALUES (?, ?, ?, ?, ?, ?, ?, ?)
1030 ON CONFLICT (entity_id, world_id, type, source_entity_id) DO UPDATE SET
1031 data = excluded.data,
1032 updated_at = excluded.updated_at
1033 "#,
1034 )
1035 .bind(component.id.to_string())
1036 .bind(component.entity_id.to_string())
1037 .bind(component.world_id.to_string())
1038 .bind(component.source_entity_id.map(|id| id.to_string()))
1039 .bind(&component.component_type)
1040 .bind(serde_json::to_string(&component.data)?)
1041 .bind(component.created_at)
1042 .bind(component.updated_at)
1043 .execute(&self.pool)
1044 .await?;
1045
1046 Ok(result.rows_affected() > 0)
1047 }
1048
1049 async fn update_component(&self, component: &Component) -> Result<()> {
1050 sqlx::query("UPDATE components SET data = ?, updated_at = ? WHERE id = ?")
1051 .bind(serde_json::to_string(&component.data)?)
1052 .bind(chrono::Utc::now().timestamp())
1053 .bind(component.id.to_string())
1054 .execute(&self.pool)
1055 .await?;
1056
1057 Ok(())
1058 }
1059
1060 async fn delete_component(&self, component_id: UUID) -> Result<()> {
1061 sqlx::query("DELETE FROM components WHERE id = ?")
1062 .bind(component_id.to_string())
1063 .execute(&self.pool)
1064 .await?;
1065
1066 Ok(())
1067 }
1068
1069 async fn get_memories(&self, params: MemoryQuery) -> Result<Vec<Memory>> {
1070 let mut query = String::from("SELECT id, entity_id, agent_id, room_id, content, metadata, created_at, unique_flag FROM memories WHERE 1=1");
1071
1072 let mut bindings: Vec<String> = Vec::new();
1073
1074 if let Some(agent_id) = params.agent_id {
1075 query.push_str(" AND agent_id = ?");
1076 bindings.push(agent_id.to_string());
1077 }
1078 if let Some(room_id) = params.room_id {
1079 query.push_str(" AND room_id = ?");
1080 bindings.push(room_id.to_string());
1081 }
1082 if let Some(entity_id) = params.entity_id {
1083 query.push_str(" AND entity_id = ?");
1084 bindings.push(entity_id.to_string());
1085 }
1086 if let Some(unique) = params.unique {
1087 query.push_str(" AND unique_flag = ?");
1088 bindings.push(if unique {
1089 "1".to_string()
1090 } else {
1091 "0".to_string()
1092 });
1093 }
1094
1095 query.push_str(" ORDER BY created_at DESC");
1096
1097 if let Some(count) = params.count {
1098 query.push_str(&format!(" LIMIT {}", count));
1099 }
1100
1101 let mut query_builder = sqlx::query(&query);
1102 for binding in &bindings {
1103 query_builder = query_builder.bind(binding);
1104 }
1105
1106 let rows = query_builder.fetch_all(&self.pool).await?;
1107
1108 let mut memories = Vec::new();
1109 for row in rows {
1110 let id_str: String = row.get("id");
1111 let entity_id_str: String = row.get("entity_id");
1112 let agent_id_str: String = row.get("agent_id");
1113 let room_id_str: String = row.get("room_id");
1114 let content_str: String = row.get("content");
1115 let metadata_str: Option<String> = row.get("metadata");
1116 let unique_flag: i32 = row.get("unique_flag");
1117
1118 memories.push(Memory {
1119 id: uuid::Uuid::parse_str(&id_str)
1120 .map_err(|e| ZoeyError::database(format!("Invalid UUID: {}", e)))?,
1121 entity_id: uuid::Uuid::parse_str(&entity_id_str)
1122 .map_err(|e| ZoeyError::database(format!("Invalid UUID: {}", e)))?,
1123 agent_id: uuid::Uuid::parse_str(&agent_id_str)
1124 .map_err(|e| ZoeyError::database(format!("Invalid UUID: {}", e)))?,
1125 room_id: uuid::Uuid::parse_str(&room_id_str)
1126 .map_err(|e| ZoeyError::database(format!("Invalid UUID: {}", e)))?,
1127 content: serde_json::from_str(&content_str)?,
1128 embedding: None, metadata: metadata_str.and_then(|s| serde_json::from_str(&s).ok()),
1130 created_at: row.get("created_at"),
1131 unique: Some(unique_flag != 0),
1132 similarity: None,
1133 });
1134 }
1135
1136 Ok(memories)
1137 }
1138
1139 async fn create_memory(&self, memory: &Memory, _table_name: &str) -> Result<UUID> {
1140 sqlx::query(
1141 "INSERT INTO memories (id, entity_id, agent_id, room_id, content, metadata, created_at, unique_flag)
1142 VALUES (?, ?, ?, ?, ?, ?, ?, ?)"
1143 )
1144 .bind(memory.id.to_string())
1145 .bind(memory.entity_id.to_string())
1146 .bind(memory.agent_id.to_string())
1147 .bind(memory.room_id.to_string())
1148 .bind(serde_json::to_string(&memory.content)?)
1149 .bind(memory.metadata.as_ref().map(|m| serde_json::to_string(m).ok()).flatten())
1150 .bind(memory.created_at)
1151 .bind(if memory.unique.unwrap_or(false) { 1 } else { 0 })
1152 .execute(&self.pool)
1153 .await?;
1154
1155 Ok(memory.id)
1156 }
1157
1158 async fn search_memories_by_embedding(
1159 &self,
1160 _params: SearchMemoriesParams,
1161 ) -> Result<Vec<Memory>> {
1162 not_implemented!(
1164 "search_memories_by_embedding - vector search requires sqlite-vss extension"
1165 )
1166 }
1167
1168 async fn get_cached_embeddings(&self, _params: MemoryQuery) -> Result<Vec<Memory>> {
1169 Ok(vec![])
1171 }
1172
1173 async fn update_memory(&self, memory: &Memory) -> Result<bool> {
1174 let result = sqlx::query("UPDATE memories SET content = ?, metadata = ? WHERE id = ?")
1175 .bind(serde_json::to_string(&memory.content)?)
1176 .bind(
1177 memory
1178 .metadata
1179 .as_ref()
1180 .map(|m| serde_json::to_string(m).ok())
1181 .flatten(),
1182 )
1183 .bind(memory.id.to_string())
1184 .execute(&self.pool)
1185 .await?;
1186
1187 Ok(result.rows_affected() > 0)
1188 }
1189
1190 async fn remove_memory(&self, memory_id: UUID, _table_name: &str) -> Result<bool> {
1191 let result = sqlx::query("DELETE FROM memories WHERE id = ?")
1192 .bind(memory_id.to_string())
1193 .execute(&self.pool)
1194 .await?;
1195
1196 Ok(result.rows_affected() > 0)
1197 }
1198
1199 async fn remove_all_memories(&self, agent_id: UUID, _table_name: &str) -> Result<bool> {
1200 let result = sqlx::query("DELETE FROM memories WHERE agent_id = ?")
1201 .bind(agent_id.to_string())
1202 .execute(&self.pool)
1203 .await?;
1204
1205 Ok(result.rows_affected() > 0)
1206 }
1207
1208 async fn count_memories(&self, params: MemoryQuery) -> Result<usize> {
1209 let mut query = String::from("SELECT COUNT(*) as count FROM memories WHERE 1=1");
1210
1211 let mut bindings: Vec<String> = Vec::new();
1212
1213 if let Some(agent_id) = params.agent_id {
1214 query.push_str(" AND agent_id = ?");
1215 bindings.push(agent_id.to_string());
1216 }
1217 if let Some(room_id) = params.room_id {
1218 query.push_str(" AND room_id = ?");
1219 bindings.push(room_id.to_string());
1220 }
1221 if let Some(entity_id) = params.entity_id {
1222 query.push_str(" AND entity_id = ?");
1223 bindings.push(entity_id.to_string());
1224 }
1225 if let Some(unique) = params.unique {
1226 query.push_str(" AND unique_flag = ?");
1227 bindings.push(if unique {
1228 "1".to_string()
1229 } else {
1230 "0".to_string()
1231 });
1232 }
1233
1234 let mut query_builder = sqlx::query(&query);
1235 for binding in &bindings {
1236 query_builder = query_builder.bind(binding);
1237 }
1238
1239 let row = query_builder.fetch_one(&self.pool).await?;
1240 let count: i64 = row.get("count");
1241
1242 Ok(count as usize)
1243 }
1244
1245 async fn get_world(&self, world_id: UUID) -> Result<Option<World>> {
1246 let row = sqlx::query(
1247 "SELECT id, name, agent_id, server_id, metadata, created_at FROM worlds WHERE id = ?",
1248 )
1249 .bind(world_id.to_string())
1250 .fetch_optional(&self.pool)
1251 .await?;
1252
1253 match row {
1254 Some(row) => {
1255 let id_str: String = row.get("id");
1256 let agent_id_str: String = row.get("agent_id");
1257 let metadata_str: String = row.get("metadata");
1258
1259 Ok(Some(World {
1260 id: uuid::Uuid::parse_str(&id_str)
1261 .map_err(|e| ZoeyError::database(format!("Invalid UUID: {}", e)))?,
1262 name: row.get("name"),
1263 agent_id: uuid::Uuid::parse_str(&agent_id_str)
1264 .map_err(|e| ZoeyError::database(format!("Invalid UUID: {}", e)))?,
1265 server_id: row.get("server_id"),
1266 metadata: serde_json::from_str(&metadata_str).unwrap_or_default(),
1267 created_at: row.get("created_at"),
1268 }))
1269 }
1270 None => Ok(None),
1271 }
1272 }
1273
1274 async fn ensure_world(&self, world: &World) -> Result<()> {
1275 sqlx::query(
1276 r#"
1277 INSERT INTO worlds (id, name, agent_id, server_id, metadata, created_at)
1278 VALUES (?, ?, ?, ?, ?, ?)
1279 ON CONFLICT (id) DO UPDATE SET
1280 name = excluded.name,
1281 server_id = excluded.server_id,
1282 metadata = excluded.metadata
1283 "#,
1284 )
1285 .bind(world.id.to_string())
1286 .bind(&world.name)
1287 .bind(world.agent_id.to_string())
1288 .bind(&world.server_id)
1289 .bind(serde_json::to_string(&world.metadata)?)
1290 .bind(
1291 world
1292 .created_at
1293 .unwrap_or_else(|| chrono::Utc::now().timestamp()),
1294 )
1295 .execute(&self.pool)
1296 .await?;
1297
1298 Ok(())
1299 }
1300
1301 async fn get_room(&self, room_id: UUID) -> Result<Option<Room>> {
1302 let row = sqlx::query(
1303 "SELECT id, agent_id, name, source, type, channel_id, server_id, world_id, metadata, created_at FROM rooms WHERE id = ?",
1304 )
1305 .bind(room_id.to_string())
1306 .fetch_optional(&self.pool)
1307 .await?;
1308
1309 match row {
1310 Some(row) => {
1311 let id_str: String = row.get("id");
1312 let agent_id_str: Option<String> = row.get("agent_id");
1313 let world_id_str: String = row.get("world_id");
1314 let metadata_str: String = row.get("metadata");
1315 let type_str: String = row.get("type");
1316
1317 Ok(Some(Room {
1318 id: uuid::Uuid::parse_str(&id_str)
1319 .map_err(|e| ZoeyError::database(format!("Invalid UUID: {}", e)))?,
1320 agent_id: agent_id_str
1321 .map(|s| uuid::Uuid::parse_str(&s).ok())
1322 .flatten(),
1323 name: row.get("name"),
1324 source: row.get("source"),
1325 channel_type: serde_json::from_str(&format!("\"{}\"", type_str))
1326 .unwrap_or(ChannelType::Unknown),
1327 channel_id: row.get("channel_id"),
1328 server_id: row.get("server_id"),
1329 world_id: uuid::Uuid::parse_str(&world_id_str)
1330 .map_err(|e| ZoeyError::database(format!("Invalid UUID: {}", e)))?,
1331 metadata: serde_json::from_str(&metadata_str).unwrap_or_default(),
1332 created_at: row.get("created_at"),
1333 }))
1334 }
1335 None => Ok(None),
1336 }
1337 }
1338
1339 async fn create_room(&self, room: &Room) -> Result<UUID> {
1340 let channel_type_str = serde_json::to_string(&room.channel_type)?
1341 .trim_matches('"')
1342 .to_string();
1343
1344 sqlx::query(
1345 r#"
1346 INSERT INTO rooms (id, agent_id, name, source, type, channel_id, server_id, world_id, metadata, created_at)
1347 VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
1348 "#,
1349 )
1350 .bind(room.id.to_string())
1351 .bind(room.agent_id.map(|id| id.to_string()))
1352 .bind(&room.name)
1353 .bind(&room.source)
1354 .bind(&channel_type_str)
1355 .bind(&room.channel_id)
1356 .bind(&room.server_id)
1357 .bind(room.world_id.to_string())
1358 .bind(serde_json::to_string(&room.metadata)?)
1359 .bind(room.created_at.unwrap_or_else(|| chrono::Utc::now().timestamp()))
1360 .execute(&self.pool)
1361 .await?;
1362
1363 Ok(room.id)
1364 }
1365
1366 async fn get_rooms(&self, world_id: UUID) -> Result<Vec<Room>> {
1367 let rows = sqlx::query(
1368 "SELECT id, agent_id, name, source, type, channel_id, server_id, world_id, metadata, created_at FROM rooms WHERE world_id = ?",
1369 )
1370 .bind(world_id.to_string())
1371 .fetch_all(&self.pool)
1372 .await?;
1373
1374 let mut rooms = Vec::new();
1375 for row in rows {
1376 let id_str: String = row.get("id");
1377 let agent_id_str: Option<String> = row.get("agent_id");
1378 let world_id_str: String = row.get("world_id");
1379 let metadata_str: String = row.get("metadata");
1380 let type_str: String = row.get("type");
1381
1382 rooms.push(Room {
1383 id: uuid::Uuid::parse_str(&id_str)
1384 .map_err(|e| ZoeyError::database(format!("Invalid UUID: {}", e)))?,
1385 agent_id: agent_id_str
1386 .map(|s| uuid::Uuid::parse_str(&s).ok())
1387 .flatten(),
1388 name: row.get("name"),
1389 source: row.get("source"),
1390 channel_type: serde_json::from_str(&format!("\"{}\"", type_str))
1391 .unwrap_or(ChannelType::Unknown),
1392 channel_id: row.get("channel_id"),
1393 server_id: row.get("server_id"),
1394 world_id: uuid::Uuid::parse_str(&world_id_str)
1395 .map_err(|e| ZoeyError::database(format!("Invalid UUID: {}", e)))?,
1396 metadata: serde_json::from_str(&metadata_str).unwrap_or_default(),
1397 created_at: row.get("created_at"),
1398 });
1399 }
1400
1401 Ok(rooms)
1402 }
1403
1404 async fn get_rooms_for_agent(&self, agent_id: UUID) -> Result<Vec<Room>> {
1405 let rows = sqlx::query(
1406 "SELECT id, agent_id, name, source, type, channel_id, server_id, world_id, metadata, created_at FROM rooms WHERE agent_id = ?",
1407 )
1408 .bind(agent_id.to_string())
1409 .fetch_all(&self.pool)
1410 .await?;
1411
1412 let mut rooms = Vec::new();
1413 for row in rows {
1414 let id_str: String = row.get("id");
1415 let agent_id_str: Option<String> = row.get("agent_id");
1416 let world_id_str: String = row.get("world_id");
1417 let metadata_str: String = row.get("metadata");
1418 let type_str: String = row.get("type");
1419
1420 rooms.push(Room {
1421 id: uuid::Uuid::parse_str(&id_str)
1422 .map_err(|e| ZoeyError::database(format!("Invalid UUID: {}", e)))?,
1423 agent_id: agent_id_str
1424 .map(|s| uuid::Uuid::parse_str(&s).ok())
1425 .flatten(),
1426 name: row.get("name"),
1427 source: row.get("source"),
1428 channel_type: serde_json::from_str(&format!("\"{}\"", type_str))
1429 .unwrap_or(ChannelType::Unknown),
1430 channel_id: row.get("channel_id"),
1431 server_id: row.get("server_id"),
1432 world_id: uuid::Uuid::parse_str(&world_id_str)
1433 .map_err(|e| ZoeyError::database(format!("Invalid UUID: {}", e)))?,
1434 metadata: serde_json::from_str(&metadata_str).unwrap_or_default(),
1435 created_at: row.get("created_at"),
1436 });
1437 }
1438
1439 Ok(rooms)
1440 }
1441
1442 async fn add_participant(&self, entity_id: UUID, room_id: UUID) -> Result<bool> {
1443 let result = sqlx::query(
1444 r#"
1445 INSERT INTO participants (entity_id, room_id, joined_at, metadata)
1446 VALUES (?, ?, ?, '{}')
1447 ON CONFLICT (entity_id, room_id) DO NOTHING
1448 "#,
1449 )
1450 .bind(entity_id.to_string())
1451 .bind(room_id.to_string())
1452 .bind(chrono::Utc::now().timestamp())
1453 .execute(&self.pool)
1454 .await?;
1455
1456 Ok(result.rows_affected() > 0)
1457 }
1458
1459 async fn remove_participant(&self, entity_id: UUID, room_id: UUID) -> Result<bool> {
1460 let result = sqlx::query("DELETE FROM participants WHERE entity_id = ? AND room_id = ?")
1461 .bind(entity_id.to_string())
1462 .bind(room_id.to_string())
1463 .execute(&self.pool)
1464 .await?;
1465
1466 Ok(result.rows_affected() > 0)
1467 }
1468
1469 async fn get_participants(&self, room_id: UUID) -> Result<Vec<Participant>> {
1470 let rows = sqlx::query(
1471 "SELECT entity_id, room_id, joined_at, metadata FROM participants WHERE room_id = ?",
1472 )
1473 .bind(room_id.to_string())
1474 .fetch_all(&self.pool)
1475 .await?;
1476
1477 let mut participants = Vec::new();
1478 for row in rows {
1479 let entity_id_str: String = row.get("entity_id");
1480 let room_id_str: String = row.get("room_id");
1481 let metadata_str: String = row.get("metadata");
1482
1483 participants.push(Participant {
1484 entity_id: uuid::Uuid::parse_str(&entity_id_str)
1485 .map_err(|e| ZoeyError::database(format!("Invalid UUID: {}", e)))?,
1486 room_id: uuid::Uuid::parse_str(&room_id_str)
1487 .map_err(|e| ZoeyError::database(format!("Invalid UUID: {}", e)))?,
1488 joined_at: row.get("joined_at"),
1489 metadata: serde_json::from_str(&metadata_str).unwrap_or_default(),
1490 });
1491 }
1492
1493 Ok(participants)
1494 }
1495
1496 async fn create_relationship(&self, relationship: &Relationship) -> Result<bool> {
1497 let result = sqlx::query(
1498 r#"
1499 INSERT INTO relationships (id, entity_id_a, entity_id_b, type, agent_id, metadata, created_at)
1500 VALUES (?, ?, ?, ?, ?, ?, ?)
1501 ON CONFLICT (entity_id_a, entity_id_b, type) DO UPDATE SET
1502 metadata = excluded.metadata
1503 "#,
1504 )
1505 .bind(uuid::Uuid::new_v4().to_string())
1506 .bind(relationship.entity_id_a.to_string())
1507 .bind(relationship.entity_id_b.to_string())
1508 .bind(&relationship.relationship_type)
1509 .bind(relationship.agent_id.to_string())
1510 .bind(serde_json::to_string(&relationship.metadata)?)
1511 .bind(relationship.created_at.unwrap_or_else(|| chrono::Utc::now().timestamp()))
1512 .execute(&self.pool)
1513 .await?;
1514
1515 Ok(result.rows_affected() > 0)
1516 }
1517
1518 async fn get_relationship(
1519 &self,
1520 entity_id_a: UUID,
1521 entity_id_b: UUID,
1522 ) -> Result<Option<Relationship>> {
1523 let row = sqlx::query(
1524 "SELECT entity_id_a, entity_id_b, type, agent_id, metadata, created_at FROM relationships WHERE entity_id_a = ? AND entity_id_b = ?",
1525 )
1526 .bind(entity_id_a.to_string())
1527 .bind(entity_id_b.to_string())
1528 .fetch_optional(&self.pool)
1529 .await?;
1530
1531 match row {
1532 Some(row) => {
1533 let entity_a_str: String = row.get("entity_id_a");
1534 let entity_b_str: String = row.get("entity_id_b");
1535 let agent_id_str: String = row.get("agent_id");
1536 let metadata_str: String = row.get("metadata");
1537
1538 Ok(Some(Relationship {
1539 entity_id_a: uuid::Uuid::parse_str(&entity_a_str)
1540 .map_err(|e| ZoeyError::database(format!("Invalid UUID: {}", e)))?,
1541 entity_id_b: uuid::Uuid::parse_str(&entity_b_str)
1542 .map_err(|e| ZoeyError::database(format!("Invalid UUID: {}", e)))?,
1543 relationship_type: row.get("type"),
1544 agent_id: uuid::Uuid::parse_str(&agent_id_str)
1545 .map_err(|e| ZoeyError::database(format!("Invalid UUID: {}", e)))?,
1546 metadata: serde_json::from_str(&metadata_str).unwrap_or_default(),
1547 created_at: row.get("created_at"),
1548 }))
1549 }
1550 None => Ok(None),
1551 }
1552 }
1553
1554 async fn create_task(&self, task: &Task) -> Result<UUID> {
1555 let status_str = serde_json::to_string(&task.status)?
1556 .trim_matches('"')
1557 .to_string();
1558
1559 sqlx::query(
1560 r#"
1561 INSERT INTO tasks (id, agent_id, task_type, data, status, priority, scheduled_at, executed_at, retry_count, max_retries, error, created_at, updated_at)
1562 VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
1563 "#,
1564 )
1565 .bind(task.id.to_string())
1566 .bind(task.agent_id.to_string())
1567 .bind(&task.task_type)
1568 .bind(serde_json::to_string(&task.data)?)
1569 .bind(&status_str)
1570 .bind(task.priority)
1571 .bind(task.scheduled_at)
1572 .bind(task.executed_at)
1573 .bind(task.retry_count)
1574 .bind(task.max_retries)
1575 .bind(&task.error)
1576 .bind(task.created_at)
1577 .bind(task.updated_at)
1578 .execute(&self.pool)
1579 .await?;
1580
1581 Ok(task.id)
1582 }
1583
1584 async fn update_task(&self, task: &Task) -> Result<bool> {
1585 let status_str = serde_json::to_string(&task.status)?
1586 .trim_matches('"')
1587 .to_string();
1588
1589 let result = sqlx::query(
1590 r#"
1591 UPDATE tasks SET
1592 status = ?,
1593 priority = ?,
1594 scheduled_at = ?,
1595 executed_at = ?,
1596 retry_count = ?,
1597 error = ?,
1598 updated_at = ?
1599 WHERE id = ?
1600 "#,
1601 )
1602 .bind(&status_str)
1603 .bind(task.priority)
1604 .bind(task.scheduled_at)
1605 .bind(task.executed_at)
1606 .bind(task.retry_count)
1607 .bind(&task.error)
1608 .bind(chrono::Utc::now().timestamp())
1609 .bind(task.id.to_string())
1610 .execute(&self.pool)
1611 .await?;
1612
1613 Ok(result.rows_affected() > 0)
1614 }
1615
1616 async fn get_task(&self, task_id: UUID) -> Result<Option<Task>> {
1617 let row = sqlx::query(
1618 "SELECT id, agent_id, task_type, data, status, priority, scheduled_at, executed_at, retry_count, max_retries, error, created_at, updated_at FROM tasks WHERE id = ?",
1619 )
1620 .bind(task_id.to_string())
1621 .fetch_optional(&self.pool)
1622 .await?;
1623
1624 match row {
1625 Some(row) => {
1626 let id_str: String = row.get("id");
1627 let agent_id_str: String = row.get("agent_id");
1628 let data_str: String = row.get("data");
1629 let status_str: String = row.get("status");
1630
1631 Ok(Some(Task {
1632 id: uuid::Uuid::parse_str(&id_str)
1633 .map_err(|e| ZoeyError::database(format!("Invalid UUID: {}", e)))?,
1634 agent_id: uuid::Uuid::parse_str(&agent_id_str)
1635 .map_err(|e| ZoeyError::database(format!("Invalid UUID: {}", e)))?,
1636 task_type: row.get("task_type"),
1637 data: serde_json::from_str(&data_str)?,
1638 status: serde_json::from_str(&format!("\"{}\"", status_str))
1639 .unwrap_or(TaskStatus::Pending),
1640 priority: row.get("priority"),
1641 scheduled_at: row.get("scheduled_at"),
1642 executed_at: row.get("executed_at"),
1643 retry_count: row.get("retry_count"),
1644 max_retries: row.get("max_retries"),
1645 error: row.get("error"),
1646 created_at: row.get("created_at"),
1647 updated_at: row.get("updated_at"),
1648 }))
1649 }
1650 None => Ok(None),
1651 }
1652 }
1653
1654 async fn get_pending_tasks(&self, agent_id: UUID) -> Result<Vec<Task>> {
1655 let rows = sqlx::query(
1656 r#"
1657 SELECT id, agent_id, task_type, data, status, priority, scheduled_at, executed_at, retry_count, max_retries, error, created_at, updated_at
1658 FROM tasks
1659 WHERE agent_id = ? AND status = 'PENDING'
1660 ORDER BY priority DESC, scheduled_at ASC
1661 "#,
1662 )
1663 .bind(agent_id.to_string())
1664 .fetch_all(&self.pool)
1665 .await?;
1666
1667 let mut tasks = Vec::new();
1668 for row in rows {
1669 let id_str: String = row.get("id");
1670 let agent_id_str: String = row.get("agent_id");
1671 let data_str: String = row.get("data");
1672 let status_str: String = row.get("status");
1673
1674 tasks.push(Task {
1675 id: uuid::Uuid::parse_str(&id_str)
1676 .map_err(|e| ZoeyError::database(format!("Invalid UUID: {}", e)))?,
1677 agent_id: uuid::Uuid::parse_str(&agent_id_str)
1678 .map_err(|e| ZoeyError::database(format!("Invalid UUID: {}", e)))?,
1679 task_type: row.get("task_type"),
1680 data: serde_json::from_str(&data_str)?,
1681 status: serde_json::from_str(&format!("\"{}\"", status_str))
1682 .unwrap_or(TaskStatus::Pending),
1683 priority: row.get("priority"),
1684 scheduled_at: row.get("scheduled_at"),
1685 executed_at: row.get("executed_at"),
1686 retry_count: row.get("retry_count"),
1687 max_retries: row.get("max_retries"),
1688 error: row.get("error"),
1689 created_at: row.get("created_at"),
1690 updated_at: row.get("updated_at"),
1691 });
1692 }
1693
1694 Ok(tasks)
1695 }
1696
1697 async fn log(&self, log: &Log) -> Result<()> {
1698 let id = log.id.unwrap_or_else(uuid::Uuid::new_v4);
1699
1700 sqlx::query(
1701 r#"
1702 INSERT INTO logs (id, entity_id, room_id, body, type, created_at)
1703 VALUES (?, ?, ?, ?, ?, ?)
1704 "#,
1705 )
1706 .bind(id.to_string())
1707 .bind(log.entity_id.to_string())
1708 .bind(log.room_id.map(|id| id.to_string()))
1709 .bind(serde_json::to_string(&log.body)?)
1710 .bind(&log.log_type)
1711 .bind(log.created_at)
1712 .execute(&self.pool)
1713 .await?;
1714
1715 Ok(())
1716 }
1717
1718 async fn get_logs(&self, params: LogQuery) -> Result<Vec<Log>> {
1719 let mut query = String::from(
1720 "SELECT id, entity_id, room_id, body, type, created_at FROM logs WHERE 1=1",
1721 );
1722 let mut bindings: Vec<String> = Vec::new();
1723
1724 if let Some(entity_id) = params.entity_id {
1725 query.push_str(" AND entity_id = ?");
1726 bindings.push(entity_id.to_string());
1727 }
1728 if let Some(room_id) = params.room_id {
1729 query.push_str(" AND room_id = ?");
1730 bindings.push(room_id.to_string());
1731 }
1732 if let Some(log_type) = params.log_type {
1733 query.push_str(" AND type = ?");
1734 bindings.push(log_type);
1735 }
1736
1737 query.push_str(" ORDER BY created_at DESC");
1738
1739 if let Some(limit) = params.limit {
1740 query.push_str(&format!(" LIMIT {}", limit));
1741 }
1742 if let Some(offset) = params.offset {
1743 query.push_str(&format!(" OFFSET {}", offset));
1744 }
1745
1746 let mut query_builder = sqlx::query(&query);
1747 for binding in &bindings {
1748 query_builder = query_builder.bind(binding);
1749 }
1750
1751 let rows = query_builder.fetch_all(&self.pool).await?;
1752
1753 let mut logs = Vec::new();
1754 for row in rows {
1755 let id_str: String = row.get("id");
1756 let entity_id_str: String = row.get("entity_id");
1757 let room_id_str: Option<String> = row.get("room_id");
1758 let body_str: String = row.get("body");
1759
1760 logs.push(Log {
1761 id: Some(
1762 uuid::Uuid::parse_str(&id_str)
1763 .map_err(|e| ZoeyError::database(format!("Invalid UUID: {}", e)))?,
1764 ),
1765 entity_id: uuid::Uuid::parse_str(&entity_id_str)
1766 .map_err(|e| ZoeyError::database(format!("Invalid UUID: {}", e)))?,
1767 room_id: room_id_str
1768 .map(|s| uuid::Uuid::parse_str(&s).ok())
1769 .flatten(),
1770 body: serde_json::from_str(&body_str)?,
1771 log_type: row.get("type"),
1772 created_at: row.get("created_at"),
1773 });
1774 }
1775
1776 Ok(logs)
1777 }
1778
1779 async fn get_agent_run_summaries(
1780 &self,
1781 _params: RunSummaryQuery,
1782 ) -> Result<AgentRunSummaryResult> {
1783 Ok(AgentRunSummaryResult {
1785 runs: vec![],
1786 total: 0,
1787 has_more: false,
1788 })
1789 }
1790}
1791
1792#[cfg(test)]
1793mod tests {
1794 use super::*;
1795
1796 #[test]
1797 fn test_sqlite_adapter_creation() {
1798 assert!(true);
1800 }
1801}