Skip to main content

ares/db/
turso.rs

1use crate::types::{AppError, MemoryFact, Message, MessageRole, Preference, Result};
2use chrono::Utc;
3use libsql::{params, Builder, Connection, Database};
4use std::sync::Arc;
5use tokio::sync::Mutex;
6
7/// Conversation record from the database
8#[derive(Debug, Clone)]
9pub struct Conversation {
10    /// Unique conversation identifier
11    pub id: String,
12    /// ID of the user who owns this conversation
13    pub user_id: String,
14    /// Optional conversation title
15    pub title: Option<String>,
16    /// Number of messages in the conversation
17    pub message_count: i32,
18    /// RFC3339 formatted creation timestamp
19    pub created_at: String,
20    /// RFC3339 formatted last update timestamp
21    pub updated_at: String,
22}
23
24/// Turso/libSQL database client for persistent storage
25///
26/// Supports both remote Turso databases and local SQLite files.
27/// Handles connection pooling and schema initialization automatically.
28pub struct TursoClient {
29    db: Database,
30    /// Cached connection for in-memory databases to ensure schema persists
31    cached_conn: Arc<Mutex<Option<Connection>>>,
32    is_memory: bool,
33}
34
35impl TursoClient {
36    /// Create a new TursoClient with remote Turso database
37    pub async fn new_remote(url: String, auth_token: String) -> Result<Self> {
38        let db = Builder::new_remote(url, auth_token)
39            .build()
40            .await
41            .map_err(|e| AppError::Database(format!("Failed to connect to Turso: {}", e)))?;
42
43        let client = Self {
44            db,
45            cached_conn: Arc::new(Mutex::new(None)),
46            is_memory: false,
47        };
48        client.initialize_schema().await?;
49
50        Ok(client)
51    }
52
53    /// Create a new TursoClient with local SQLite database
54    pub async fn new_local(path: &str) -> Result<Self> {
55        let is_memory = path == ":memory:";
56        let db = Builder::new_local(path)
57            .build()
58            .await
59            .map_err(|e| AppError::Database(format!("Failed to open local database: {}", e)))?;
60
61        let client = Self {
62            db,
63            cached_conn: Arc::new(Mutex::new(None)),
64            is_memory,
65        };
66
67        // For in-memory databases, we need to cache the connection
68        // so that schema persists across calls
69        if is_memory {
70            let conn = client
71                .db
72                .connect()
73                .map_err(|e| AppError::Database(format!("Failed to get connection: {}", e)))?;
74            *client.cached_conn.lock().await = Some(conn);
75        }
76
77        client.initialize_schema().await?;
78
79        Ok(client)
80    }
81
82    /// Create a new TursoClient with in-memory database (useful for testing)
83    pub async fn new_memory() -> Result<Self> {
84        Self::new_local(":memory:").await
85    }
86
87    /// Create client based on URL format - routes to local or remote
88    pub async fn new(url: String, auth_token: String) -> Result<Self> {
89        // If URL starts with "file:" or is a path, use local mode
90        if url.starts_with("file:") || url.ends_with(".db") || url == ":memory:" {
91            Self::new_local(&url).await
92        } else if url.starts_with("libsql://") || url.starts_with("https://") {
93            Self::new_remote(url, auth_token).await
94        } else {
95            // Default to local with the URL as path
96            Self::new_local(&url).await
97        }
98    }
99
100    /// Get a raw database connection (prefer `operation_conn` for most uses)
101    pub fn connection(&self) -> Result<Connection> {
102        self.db
103            .connect()
104            .map_err(|e| AppError::Database(format!("Failed to get connection: {}", e)))
105    }
106
107    /// Get the connection to use for operations (handles in-memory vs file-based)
108    pub async fn operation_conn(&self) -> Result<Connection> {
109        if self.is_memory {
110            let guard = self.cached_conn.lock().await;
111            guard.as_ref().cloned().ok_or_else(|| {
112                AppError::Database("No cached connection for in-memory database".to_string())
113            })
114        } else {
115            self.connection()
116        }
117    }
118
119    async fn initialize_schema(&self) -> Result<()> {
120        let conn = self.operation_conn().await?;
121
122        // Users table
123        conn.execute(
124            "CREATE TABLE IF NOT EXISTS users (
125                id TEXT PRIMARY KEY,
126                email TEXT UNIQUE NOT NULL,
127                password_hash TEXT NOT NULL,
128                name TEXT NOT NULL,
129                created_at INTEGER NOT NULL,
130                updated_at INTEGER NOT NULL
131            )",
132            (),
133        )
134        .await
135        .map_err(|e| AppError::Database(format!("Failed to create users table: {}", e)))?;
136
137        // Sessions table
138        conn.execute(
139            "CREATE TABLE IF NOT EXISTS sessions (
140                id TEXT PRIMARY KEY,
141                user_id TEXT NOT NULL,
142                token_hash TEXT NOT NULL,
143                expires_at INTEGER NOT NULL,
144                created_at INTEGER NOT NULL,
145                FOREIGN KEY (user_id) REFERENCES users(id)
146            )",
147            (),
148        )
149        .await
150        .map_err(|e| AppError::Database(format!("Failed to create sessions table: {}", e)))?;
151
152        // Conversations table
153        conn.execute(
154            "CREATE TABLE IF NOT EXISTS conversations (
155                id TEXT PRIMARY KEY,
156                user_id TEXT NOT NULL,
157                title TEXT,
158                created_at INTEGER NOT NULL,
159                updated_at INTEGER NOT NULL,
160                FOREIGN KEY (user_id) REFERENCES users(id)
161            )",
162            (),
163        )
164        .await
165        .map_err(|e| AppError::Database(format!("Failed to create conversations table: {}", e)))?;
166
167        // Messages table
168        conn.execute(
169            "CREATE TABLE IF NOT EXISTS messages (
170                id TEXT PRIMARY KEY,
171                conversation_id TEXT NOT NULL,
172                role TEXT NOT NULL,
173                content TEXT NOT NULL,
174                timestamp INTEGER NOT NULL,
175                FOREIGN KEY (conversation_id) REFERENCES conversations(id)
176            )",
177            (),
178        )
179        .await
180        .map_err(|e| AppError::Database(format!("Failed to create messages table: {}", e)))?;
181
182        // Memory facts table
183        conn.execute(
184            "CREATE TABLE IF NOT EXISTS memory_facts (
185                id TEXT PRIMARY KEY,
186                user_id TEXT NOT NULL,
187                category TEXT NOT NULL,
188                fact_key TEXT NOT NULL,
189                fact_value TEXT NOT NULL,
190                confidence REAL NOT NULL,
191                created_at INTEGER NOT NULL,
192                updated_at INTEGER NOT NULL,
193                FOREIGN KEY (user_id) REFERENCES users(id)
194            )",
195            (),
196        )
197        .await
198        .map_err(|e| AppError::Database(format!("Failed to create memory_facts table: {}", e)))?;
199
200        // Preferences table
201        conn.execute(
202            "CREATE TABLE IF NOT EXISTS preferences (
203                id TEXT PRIMARY KEY,
204                user_id TEXT NOT NULL,
205                category TEXT NOT NULL,
206                key TEXT NOT NULL,
207                value TEXT NOT NULL,
208                confidence REAL NOT NULL,
209                created_at INTEGER NOT NULL,
210                FOREIGN KEY (user_id) REFERENCES users(id),
211                UNIQUE(user_id, category, key)
212            )",
213            (),
214        )
215        .await
216        .map_err(|e| AppError::Database(format!("Failed to create preferences table: {}", e)))?;
217
218        // User-created agents table (stores TOON-compatible agent configs)
219        conn.execute(
220            "CREATE TABLE IF NOT EXISTS user_agents (
221                id TEXT PRIMARY KEY,
222                user_id TEXT NOT NULL,
223                name TEXT NOT NULL,
224                display_name TEXT,
225                description TEXT,
226                model TEXT NOT NULL,
227                system_prompt TEXT,
228                tools TEXT DEFAULT '[]',
229                max_tool_iterations INTEGER DEFAULT 10,
230                parallel_tools INTEGER DEFAULT 0,
231                extra TEXT DEFAULT '{}',
232                is_public INTEGER DEFAULT 0,
233                usage_count INTEGER DEFAULT 0,
234                rating_sum INTEGER DEFAULT 0,
235                rating_count INTEGER DEFAULT 0,
236                created_at INTEGER NOT NULL,
237                updated_at INTEGER NOT NULL,
238                FOREIGN KEY (user_id) REFERENCES users(id),
239                UNIQUE(user_id, name)
240            )",
241            (),
242        )
243        .await
244        .map_err(|e| AppError::Database(format!("Failed to create user_agents table: {}", e)))?;
245
246        // Create index for agent lookup
247        conn.execute(
248            "CREATE INDEX IF NOT EXISTS idx_user_agents_lookup ON user_agents(user_id, name)",
249            (),
250        )
251        .await
252        .map_err(|e| {
253            AppError::Database(format!("Failed to create user_agents_lookup index: {}", e))
254        })?;
255
256        // Create index for public agent discovery
257        conn.execute(
258            "CREATE INDEX IF NOT EXISTS idx_user_agents_public ON user_agents(is_public, usage_count DESC)",
259            (),
260        )
261        .await
262        .map_err(|e| {
263            AppError::Database(format!("Failed to create user_agents_public index: {}", e))
264        })?;
265
266        // User-created tools table
267        conn.execute(
268            "CREATE TABLE IF NOT EXISTS user_tools (
269                id TEXT PRIMARY KEY,
270                user_id TEXT NOT NULL,
271                name TEXT NOT NULL,
272                display_name TEXT,
273                description TEXT,
274                enabled INTEGER DEFAULT 1,
275                timeout_secs INTEGER DEFAULT 30,
276                tool_type TEXT NOT NULL,
277                config TEXT DEFAULT '{}',
278                parameters TEXT DEFAULT '{}',
279                extra TEXT DEFAULT '{}',
280                is_public INTEGER DEFAULT 0,
281                usage_count INTEGER DEFAULT 0,
282                created_at INTEGER NOT NULL,
283                updated_at INTEGER NOT NULL,
284                FOREIGN KEY (user_id) REFERENCES users(id),
285                UNIQUE(user_id, name)
286            )",
287            (),
288        )
289        .await
290        .map_err(|e| AppError::Database(format!("Failed to create user_tools table: {}", e)))?;
291
292        // User-created MCP configurations table
293        conn.execute(
294            "CREATE TABLE IF NOT EXISTS user_mcps (
295                id TEXT PRIMARY KEY,
296                user_id TEXT NOT NULL,
297                name TEXT NOT NULL,
298                enabled INTEGER DEFAULT 1,
299                command TEXT NOT NULL,
300                args TEXT DEFAULT '[]',
301                env TEXT DEFAULT '{}',
302                timeout_secs INTEGER DEFAULT 30,
303                is_public INTEGER DEFAULT 0,
304                created_at INTEGER NOT NULL,
305                updated_at INTEGER NOT NULL,
306                FOREIGN KEY (user_id) REFERENCES users(id),
307                UNIQUE(user_id, name)
308            )",
309            (),
310        )
311        .await
312        .map_err(|e| AppError::Database(format!("Failed to create user_mcps table: {}", e)))?;
313
314        // Agent execution logs for analytics
315        conn.execute(
316            "CREATE TABLE IF NOT EXISTS agent_executions (
317                id TEXT PRIMARY KEY,
318                agent_id TEXT,
319                agent_name TEXT NOT NULL,
320                user_id TEXT NOT NULL,
321                input TEXT NOT NULL,
322                output TEXT,
323                tool_calls TEXT,
324                tokens_input INTEGER,
325                tokens_output INTEGER,
326                duration_ms INTEGER,
327                status TEXT NOT NULL,
328                error_message TEXT,
329                created_at INTEGER NOT NULL,
330                FOREIGN KEY (user_id) REFERENCES users(id)
331            )",
332            (),
333        )
334        .await
335        .map_err(|e| {
336            AppError::Database(format!("Failed to create agent_executions table: {}", e))
337        })?;
338
339        // Create indexes for execution logs
340        conn.execute(
341            "CREATE INDEX IF NOT EXISTS idx_executions_user ON agent_executions(user_id, created_at DESC)",
342            (),
343        )
344        .await
345        .map_err(|e| AppError::Database(format!("Failed to create executions_user index: {}", e)))?;
346
347        conn.execute(
348            "CREATE INDEX IF NOT EXISTS idx_executions_agent ON agent_executions(agent_name, created_at DESC)",
349            (),
350        )
351        .await
352        .map_err(|e| {
353            AppError::Database(format!("Failed to create executions_agent index: {}", e))
354        })?;
355
356        Ok(())
357    }
358
359    /// Creates a new user account
360    ///
361    /// # Arguments
362    /// * `id` - Unique user identifier
363    /// * `email` - User's email address (must be unique)
364    /// * `password_hash` - Argon2 hashed password
365    /// * `name` - User's display name
366    pub async fn create_user(
367        &self,
368        id: &str,
369        email: &str,
370        password_hash: &str,
371        name: &str,
372    ) -> Result<()> {
373        let conn = self.operation_conn().await?;
374        let now = Utc::now().timestamp();
375
376        conn.execute(
377            "INSERT INTO users (id, email, password_hash, name, created_at, updated_at)
378              VALUES (?, ?, ?, ?, ?, ?)",
379            (id, email, password_hash, name, now, now),
380        )
381        .await
382        .map_err(|e| AppError::Database(format!("Failed to create user: {}", e)))?;
383
384        Ok(())
385    }
386
387    /// Retrieves a user by email address
388    pub async fn get_user_by_email(&self, email: &str) -> Result<Option<User>> {
389        let conn = self.operation_conn().await?;
390
391        let mut rows = conn
392            .query(
393                "SELECT id, email, password_hash, name, created_at, updated_at
394                 FROM users WHERE email = ?",
395                [email],
396            )
397            .await
398            .map_err(|e| AppError::Database(format!("Failed to query user: {}", e)))?;
399
400        if let Some(row) = rows
401            .next()
402            .await
403            .map_err(|e| AppError::Database(e.to_string()))?
404        {
405            Ok(Some(User {
406                id: row.get(0).map_err(|e| AppError::Database(e.to_string()))?,
407                email: row.get(1).map_err(|e| AppError::Database(e.to_string()))?,
408                password_hash: row.get(2).map_err(|e| AppError::Database(e.to_string()))?,
409                name: row.get(3).map_err(|e| AppError::Database(e.to_string()))?,
410                created_at: row.get(4).map_err(|e| AppError::Database(e.to_string()))?,
411                updated_at: row.get(5).map_err(|e| AppError::Database(e.to_string()))?,
412            }))
413        } else {
414            Ok(None)
415        }
416    }
417
418    /// Creates a new authentication session
419    ///
420    /// # Arguments
421    /// * `id` - Unique session identifier
422    /// * `user_id` - ID of the authenticated user
423    /// * `token_hash` - Hash of the JWT refresh token
424    /// * `expires_at` - Unix timestamp when session expires
425    pub async fn create_session(
426        &self,
427        id: &str,
428        user_id: &str,
429        token_hash: &str,
430        expires_at: i64,
431    ) -> Result<()> {
432        let conn = self.operation_conn().await?;
433        let now = Utc::now().timestamp();
434
435        conn.execute(
436            "INSERT INTO sessions (id, user_id, token_hash, expires_at, created_at)
437             VALUES (?, ?, ?, ?, ?)",
438            (id, user_id, token_hash, expires_at, now),
439        )
440        .await
441        .map_err(|e| AppError::Database(format!("Failed to create session: {}", e)))?;
442
443        Ok(())
444    }
445
446    /// Creates a new conversation for a user
447    pub async fn create_conversation(
448        &self,
449        id: &str,
450        user_id: &str,
451        title: Option<&str>,
452    ) -> Result<()> {
453        let conn = self.operation_conn().await?;
454        let now = Utc::now().timestamp();
455
456        conn.execute(
457            "INSERT INTO conversations (id, user_id, title, created_at, updated_at)
458             VALUES (?, ?, ?, ?, ?)",
459            (id, user_id, title, now, now),
460        )
461        .await
462        .map_err(|e| AppError::Database(format!("Failed to create conversation: {}", e)))?;
463
464        Ok(())
465    }
466
467    /// Checks if a conversation exists by ID
468    pub async fn conversation_exists(&self, conversation_id: &str) -> Result<bool> {
469        let conn = self.operation_conn().await?;
470
471        let mut rows = conn
472            .query(
473                "SELECT 1 FROM conversations WHERE id = ?",
474                [conversation_id],
475            )
476            .await
477            .map_err(|e| AppError::Database(format!("Failed to check conversation: {}", e)))?;
478
479        Ok(rows
480            .next()
481            .await
482            .map_err(|e| AppError::Database(e.to_string()))?
483            .is_some())
484    }
485
486    /// Adds a message to a conversation
487    pub async fn add_message(
488        &self,
489        id: &str,
490        conversation_id: &str,
491        role: MessageRole,
492        content: &str,
493    ) -> Result<()> {
494        let conn = self.operation_conn().await?;
495        let now = Utc::now().timestamp();
496        let role_str = match role {
497            MessageRole::System => "system",
498            MessageRole::User => "user",
499            MessageRole::Assistant => "assistant",
500        };
501
502        conn.execute(
503            "INSERT INTO messages (id, conversation_id, role, content, timestamp)
504             VALUES (?, ?, ?, ?, ?)",
505            (id, conversation_id, role_str, content, now),
506        )
507        .await
508        .map_err(|e| AppError::Database(format!("Failed to add message: {}", e)))?;
509
510        Ok(())
511    }
512
513    /// Retrieves all messages in a conversation, ordered by timestamp
514    pub async fn get_conversation_history(&self, conversation_id: &str) -> Result<Vec<Message>> {
515        let conn = self.operation_conn().await?;
516
517        let mut rows = conn
518            .query(
519                "SELECT role, content, timestamp FROM messages
520                 WHERE conversation_id = ? ORDER BY timestamp ASC",
521                [conversation_id],
522            )
523            .await
524            .map_err(|e| AppError::Database(format!("Failed to query messages: {}", e)))?;
525
526        let mut messages = Vec::new();
527        while let Some(row) = rows
528            .next()
529            .await
530            .map_err(|e| AppError::Database(e.to_string()))?
531        {
532            let role_str: String = row.get(0).map_err(|e| AppError::Database(e.to_string()))?;
533            let role = match role_str.as_str() {
534                "system" => MessageRole::System,
535                "user" => MessageRole::User,
536                "assistant" => MessageRole::Assistant,
537                _ => MessageRole::User,
538            };
539
540            messages.push(Message {
541                role,
542                content: row.get(1).map_err(|e| AppError::Database(e.to_string()))?,
543                timestamp: chrono::DateTime::from_timestamp(
544                    row.get::<i64>(2)
545                        .map_err(|e| AppError::Database(e.to_string()))?,
546                    0,
547                )
548                .unwrap(),
549            });
550        }
551
552        Ok(messages)
553    }
554
555    /// Get a conversation by ID
556    pub async fn get_conversation(&self, conversation_id: &str) -> Result<Conversation> {
557        let conn = self.operation_conn().await?;
558
559        let mut rows = conn
560            .query(
561                "SELECT id, user_id, title, created_at, updated_at FROM conversations WHERE id = ?",
562                [conversation_id],
563            )
564            .await
565            .map_err(|e| AppError::Database(format!("Failed to query conversation: {}", e)))?;
566
567        if let Some(row) = rows
568            .next()
569            .await
570            .map_err(|e| AppError::Database(e.to_string()))?
571        {
572            let created_ts: i64 = row.get(3).map_err(|e| AppError::Database(e.to_string()))?;
573            let updated_ts: i64 = row.get(4).map_err(|e| AppError::Database(e.to_string()))?;
574
575            Ok(Conversation {
576                id: row.get(0).map_err(|e| AppError::Database(e.to_string()))?,
577                user_id: row.get(1).map_err(|e| AppError::Database(e.to_string()))?,
578                title: row.get(2).map_err(|e| AppError::Database(e.to_string()))?,
579                message_count: 0, // Will be populated separately if needed
580                created_at: chrono::DateTime::from_timestamp(created_ts, 0)
581                    .map(|dt| dt.to_rfc3339())
582                    .unwrap_or_default(),
583                updated_at: chrono::DateTime::from_timestamp(updated_ts, 0)
584                    .map(|dt| dt.to_rfc3339())
585                    .unwrap_or_default(),
586            })
587        } else {
588            Err(AppError::NotFound(format!(
589                "Conversation {} not found",
590                conversation_id
591            )))
592        }
593    }
594
595    /// Get all conversations for a user
596    pub async fn get_user_conversations(&self, user_id: &str) -> Result<Vec<Conversation>> {
597        let conn = self.operation_conn().await?;
598
599        let mut rows = conn
600            .query(
601                "SELECT c.id, c.user_id, c.title, c.created_at, c.updated_at,
602                        (SELECT COUNT(*) FROM messages m WHERE m.conversation_id = c.id) as msg_count
603                 FROM conversations c
604                 WHERE c.user_id = ?
605                 ORDER BY c.updated_at DESC",
606                [user_id],
607            )
608            .await
609            .map_err(|e| AppError::Database(format!("Failed to query conversations: {}", e)))?;
610
611        let mut conversations = Vec::new();
612        while let Some(row) = rows
613            .next()
614            .await
615            .map_err(|e| AppError::Database(e.to_string()))?
616        {
617            let created_ts: i64 = row.get(3).map_err(|e| AppError::Database(e.to_string()))?;
618            let updated_ts: i64 = row.get(4).map_err(|e| AppError::Database(e.to_string()))?;
619
620            conversations.push(Conversation {
621                id: row.get(0).map_err(|e| AppError::Database(e.to_string()))?,
622                user_id: row.get(1).map_err(|e| AppError::Database(e.to_string()))?,
623                title: row.get(2).map_err(|e| AppError::Database(e.to_string()))?,
624                message_count: row.get::<i32>(5).unwrap_or(0),
625                created_at: chrono::DateTime::from_timestamp(created_ts, 0)
626                    .map(|dt| dt.to_rfc3339())
627                    .unwrap_or_default(),
628                updated_at: chrono::DateTime::from_timestamp(updated_ts, 0)
629                    .map(|dt| dt.to_rfc3339())
630                    .unwrap_or_default(),
631            });
632        }
633
634        Ok(conversations)
635    }
636
637    /// Update conversation title
638    pub async fn update_conversation_title(
639        &self,
640        conversation_id: &str,
641        title: Option<&str>,
642    ) -> Result<()> {
643        let conn = self.operation_conn().await?;
644        let now = Utc::now().timestamp();
645
646        conn.execute(
647            "UPDATE conversations SET title = ?, updated_at = ? WHERE id = ?",
648            (title, now, conversation_id),
649        )
650        .await
651        .map_err(|e| AppError::Database(format!("Failed to update conversation: {}", e)))?;
652
653        Ok(())
654    }
655
656    /// Delete a conversation and all its messages
657    pub async fn delete_conversation(&self, conversation_id: &str) -> Result<()> {
658        let conn = self.operation_conn().await?;
659
660        // Delete messages first (foreign key constraint)
661        conn.execute(
662            "DELETE FROM messages WHERE conversation_id = ?",
663            [conversation_id],
664        )
665        .await
666        .map_err(|e| AppError::Database(format!("Failed to delete messages: {}", e)))?;
667
668        // Delete conversation
669        conn.execute("DELETE FROM conversations WHERE id = ?", [conversation_id])
670            .await
671            .map_err(|e| AppError::Database(format!("Failed to delete conversation: {}", e)))?;
672
673        Ok(())
674    }
675
676    /// Stores a memory fact for a user (upserts on id)
677    pub async fn store_memory_fact(&self, fact: &MemoryFact) -> Result<()> {
678        let conn = self.operation_conn().await?;
679
680        conn.execute(
681            "INSERT OR REPLACE INTO memory_facts
682            (id, user_id, category, fact_key, fact_value, confidence, created_at, updated_at)
683            VALUES (?, ?, ?, ?, ?, ?, ?, ?)",
684            (
685                fact.id.as_str(),
686                fact.user_id.as_str(),
687                fact.category.as_str(),
688                fact.fact_key.as_str(),
689                fact.fact_value.as_str(),
690                fact.confidence as f64,
691                fact.created_at.timestamp(),
692                fact.updated_at.timestamp(),
693            ),
694        )
695        .await
696        .map_err(|e| AppError::Database(format!("Failed to store memory fact: {}", e)))?;
697
698        Ok(())
699    }
700
701    /// Retrieves all memory facts for a user
702    pub async fn get_user_memory(&self, user_id: &str) -> Result<Vec<MemoryFact>> {
703        let conn = self.operation_conn().await?;
704
705        let mut rows = conn
706            .query(
707                "SELECT id, user_id, category, fact_key, fact_value, confidence, created_at, updated_at
708                FROM memory_facts WHERE user_id = ?",
709                [user_id],
710            )
711            .await
712            .map_err(|e| AppError::Database(format!("Failed to query memory facts: {}", e)))?;
713
714        let mut facts = Vec::new();
715        while let Some(row) = rows
716            .next()
717            .await
718            .map_err(|e| AppError::Database(e.to_string()))?
719        {
720            facts.push(MemoryFact {
721                id: row.get(0).map_err(|e| AppError::Database(e.to_string()))?,
722                user_id: row.get(1).map_err(|e| AppError::Database(e.to_string()))?,
723                category: row.get(2).map_err(|e| AppError::Database(e.to_string()))?,
724                fact_key: row.get(3).map_err(|e| AppError::Database(e.to_string()))?,
725                fact_value: row.get(4).map_err(|e| AppError::Database(e.to_string()))?,
726                confidence: row
727                    .get::<f64>(5)
728                    .map_err(|e| AppError::Database(e.to_string()))?
729                    as f32,
730                created_at: chrono::DateTime::from_timestamp(
731                    row.get::<i64>(6)
732                        .map_err(|e| AppError::Database(e.to_string()))?,
733                    0,
734                )
735                .unwrap(),
736                updated_at: chrono::DateTime::from_timestamp(
737                    row.get::<i64>(7)
738                        .map_err(|e| AppError::Database(e.to_string()))?,
739                    0,
740                )
741                .unwrap(),
742            });
743        }
744
745        Ok(facts)
746    }
747
748    /// Stores a user preference (upserts on user_id + category + key)
749    pub async fn store_preference(&self, user_id: &str, preference: &Preference) -> Result<()> {
750        let conn = self.operation_conn().await?;
751        let now = Utc::now().timestamp();
752        let id = uuid::Uuid::new_v4().to_string();
753
754        conn.execute(
755            "INSERT OR REPLACE INTO preferences
756             (id, user_id, category, key, value, confidence, created_at)
757             VALUES (?, ?, ?, ?, ?, ?, ?)",
758            (
759                id,
760                user_id,
761                preference.category.as_str(),
762                preference.key.as_str(),
763                preference.value.as_str(),
764                preference.confidence as f64,
765                now,
766            ),
767        )
768        .await
769        .map_err(|e| AppError::Database(format!("Failed to store preference: {}", e)))?;
770
771        Ok(())
772    }
773
774    /// Retrieves all preferences for a user
775    pub async fn get_user_preferences(&self, user_id: &str) -> Result<Vec<Preference>> {
776        let conn = self.operation_conn().await?;
777
778        let mut rows = conn
779            .query(
780                "SELECT category, key, value, confidence FROM preferences WHERE user_id = ?",
781                [user_id],
782            )
783            .await
784            .map_err(|e| AppError::Database(format!("Failed to query preferences: {}", e)))?;
785
786        let mut preferences = Vec::new();
787        while let Some(row) = rows
788            .next()
789            .await
790            .map_err(|e| AppError::Database(e.to_string()))?
791        {
792            preferences.push(Preference {
793                category: row.get(0).map_err(|e| AppError::Database(e.to_string()))?,
794                key: row.get(1).map_err(|e| AppError::Database(e.to_string()))?,
795                value: row.get(2).map_err(|e| AppError::Database(e.to_string()))?,
796                confidence: row
797                    .get::<f64>(3)
798                    .map_err(|e| AppError::Database(e.to_string()))?
799                    as f32,
800            });
801        }
802
803        Ok(preferences)
804    }
805
806    // ============= User Agent Operations =============
807
808    /// Create a new user-defined agent
809    pub async fn create_user_agent(&self, agent: &UserAgent) -> Result<()> {
810        let conn = self.operation_conn().await?;
811
812        // Convert Option<String> to Option<&str> for libsql compatibility
813        let display_name = agent.display_name.as_deref();
814        let description = agent.description.as_deref();
815        let system_prompt = agent.system_prompt.as_deref();
816
817        conn.execute(
818            "INSERT INTO user_agents (
819                id, user_id, name, display_name, description, model, system_prompt,
820                tools, max_tool_iterations, parallel_tools, extra, is_public,
821                usage_count, rating_sum, rating_count, created_at, updated_at
822            ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13, ?14, ?15, ?16, ?17)",
823            params![
824                agent.id.as_str(),
825                agent.user_id.as_str(),
826                agent.name.as_str(),
827                display_name,
828                description,
829                agent.model.as_str(),
830                system_prompt,
831                agent.tools.as_str(),
832                agent.max_tool_iterations,
833                agent.parallel_tools as i32,
834                agent.extra.as_str(),
835                agent.is_public as i32,
836                agent.usage_count,
837                agent.rating_sum,
838                agent.rating_count,
839                agent.created_at,
840                agent.updated_at,
841            ],
842        )
843        .await
844        .map_err(|e| AppError::Database(format!("Failed to create user agent: {}", e)))?;
845
846        Ok(())
847    }
848
849    /// Get a user agent by ID
850    pub async fn get_user_agent(&self, id: &str) -> Result<Option<UserAgent>> {
851        let conn = self.operation_conn().await?;
852
853        let mut rows = conn
854            .query(
855                "SELECT id, user_id, name, display_name, description, model, system_prompt,
856                        tools, max_tool_iterations, parallel_tools, extra, is_public,
857                        usage_count, rating_sum, rating_count, created_at, updated_at
858                 FROM user_agents WHERE id = ?",
859                [id],
860            )
861            .await
862            .map_err(|e| AppError::Database(format!("Failed to query user agent: {}", e)))?;
863
864        if let Some(row) = rows
865            .next()
866            .await
867            .map_err(|e| AppError::Database(e.to_string()))?
868        {
869            Ok(Some(Self::row_to_user_agent(&row)?))
870        } else {
871            Ok(None)
872        }
873    }
874
875    /// Get a user agent by user_id and name
876    pub async fn get_user_agent_by_name(
877        &self,
878        user_id: &str,
879        name: &str,
880    ) -> Result<Option<UserAgent>> {
881        let conn = self.operation_conn().await?;
882
883        let mut rows = conn
884            .query(
885                "SELECT id, user_id, name, display_name, description, model, system_prompt,
886                        tools, max_tool_iterations, parallel_tools, extra, is_public,
887                        usage_count, rating_sum, rating_count, created_at, updated_at
888                 FROM user_agents WHERE user_id = ? AND name = ?",
889                (user_id, name),
890            )
891            .await
892            .map_err(|e| AppError::Database(format!("Failed to query user agent: {}", e)))?;
893
894        if let Some(row) = rows
895            .next()
896            .await
897            .map_err(|e| AppError::Database(e.to_string()))?
898        {
899            Ok(Some(Self::row_to_user_agent(&row)?))
900        } else {
901            Ok(None)
902        }
903    }
904
905    /// Get a public agent by name (for community discovery)
906    pub async fn get_public_agent_by_name(&self, name: &str) -> Result<Option<UserAgent>> {
907        let conn = self.operation_conn().await?;
908
909        let mut rows = conn
910            .query(
911                "SELECT id, user_id, name, display_name, description, model, system_prompt,
912                        tools, max_tool_iterations, parallel_tools, extra, is_public,
913                        usage_count, rating_sum, rating_count, created_at, updated_at
914                 FROM user_agents WHERE name = ? AND is_public = 1
915                 ORDER BY usage_count DESC LIMIT 1",
916                [name],
917            )
918            .await
919            .map_err(|e| AppError::Database(format!("Failed to query public agent: {}", e)))?;
920
921        if let Some(row) = rows
922            .next()
923            .await
924            .map_err(|e| AppError::Database(e.to_string()))?
925        {
926            Ok(Some(Self::row_to_user_agent(&row)?))
927        } else {
928            Ok(None)
929        }
930    }
931
932    /// List all agents for a user
933    pub async fn list_user_agents(&self, user_id: &str) -> Result<Vec<UserAgent>> {
934        let conn = self.operation_conn().await?;
935
936        let mut rows = conn
937            .query(
938                "SELECT id, user_id, name, display_name, description, model, system_prompt,
939                        tools, max_tool_iterations, parallel_tools, extra, is_public,
940                        usage_count, rating_sum, rating_count, created_at, updated_at
941                 FROM user_agents WHERE user_id = ? ORDER BY updated_at DESC",
942                [user_id],
943            )
944            .await
945            .map_err(|e| AppError::Database(format!("Failed to list user agents: {}", e)))?;
946
947        let mut agents = Vec::new();
948        while let Some(row) = rows
949            .next()
950            .await
951            .map_err(|e| AppError::Database(e.to_string()))?
952        {
953            agents.push(Self::row_to_user_agent(&row)?);
954        }
955
956        Ok(agents)
957    }
958
959    /// List public agents (community/marketplace)
960    pub async fn list_public_agents(&self, limit: u32, offset: u32) -> Result<Vec<UserAgent>> {
961        let conn = self.operation_conn().await?;
962
963        let mut rows = conn
964            .query(
965                "SELECT id, user_id, name, display_name, description, model, system_prompt,
966                        tools, max_tool_iterations, parallel_tools, extra, is_public,
967                        usage_count, rating_sum, rating_count, created_at, updated_at
968                 FROM user_agents WHERE is_public = 1
969                 ORDER BY usage_count DESC LIMIT ? OFFSET ?",
970                (limit, offset),
971            )
972            .await
973            .map_err(|e| AppError::Database(format!("Failed to list public agents: {}", e)))?;
974
975        let mut agents = Vec::new();
976        while let Some(row) = rows
977            .next()
978            .await
979            .map_err(|e| AppError::Database(e.to_string()))?
980        {
981            agents.push(Self::row_to_user_agent(&row)?);
982        }
983
984        Ok(agents)
985    }
986
987    /// Update a user agent
988    pub async fn update_user_agent(&self, agent: &UserAgent) -> Result<()> {
989        let conn = self.operation_conn().await?;
990
991        // Convert Option<String> to Option<&str> for libsql compatibility
992        let display_name = agent.display_name.as_deref();
993        let description = agent.description.as_deref();
994        let system_prompt = agent.system_prompt.as_deref();
995
996        conn.execute(
997            "UPDATE user_agents SET
998                display_name = ?1, description = ?2, model = ?3, system_prompt = ?4,
999                tools = ?5, max_tool_iterations = ?6, parallel_tools = ?7, extra = ?8,
1000                is_public = ?9, updated_at = ?10
1001             WHERE id = ?11 AND user_id = ?12",
1002            params![
1003                display_name,
1004                description,
1005                agent.model.as_str(),
1006                system_prompt,
1007                agent.tools.as_str(),
1008                agent.max_tool_iterations,
1009                agent.parallel_tools as i32,
1010                agent.extra.as_str(),
1011                agent.is_public as i32,
1012                agent.updated_at,
1013                agent.id.as_str(),
1014                agent.user_id.as_str(),
1015            ],
1016        )
1017        .await
1018        .map_err(|e| AppError::Database(format!("Failed to update user agent: {}", e)))?;
1019
1020        Ok(())
1021    }
1022
1023    /// Delete a user agent
1024    pub async fn delete_user_agent(&self, id: &str, user_id: &str) -> Result<bool> {
1025        let conn = self.operation_conn().await?;
1026
1027        let affected = conn
1028            .execute(
1029                "DELETE FROM user_agents WHERE id = ? AND user_id = ?",
1030                (id, user_id),
1031            )
1032            .await
1033            .map_err(|e| AppError::Database(format!("Failed to delete user agent: {}", e)))?;
1034
1035        Ok(affected > 0)
1036    }
1037
1038    /// Increment usage count for an agent
1039    pub async fn increment_agent_usage(&self, id: &str) -> Result<()> {
1040        let conn = self.operation_conn().await?;
1041
1042        conn.execute(
1043            "UPDATE user_agents SET usage_count = usage_count + 1 WHERE id = ?",
1044            [id],
1045        )
1046        .await
1047        .map_err(|e| AppError::Database(format!("Failed to increment agent usage: {}", e)))?;
1048
1049        Ok(())
1050    }
1051
1052    /// Helper to convert a database row to UserAgent
1053    fn row_to_user_agent(row: &libsql::Row) -> Result<UserAgent> {
1054        Ok(UserAgent {
1055            id: row.get(0).map_err(|e| AppError::Database(e.to_string()))?,
1056            user_id: row.get(1).map_err(|e| AppError::Database(e.to_string()))?,
1057            name: row.get(2).map_err(|e| AppError::Database(e.to_string()))?,
1058            display_name: row.get(3).map_err(|e| AppError::Database(e.to_string()))?,
1059            description: row.get(4).map_err(|e| AppError::Database(e.to_string()))?,
1060            model: row.get(5).map_err(|e| AppError::Database(e.to_string()))?,
1061            system_prompt: row.get(6).map_err(|e| AppError::Database(e.to_string()))?,
1062            tools: row.get(7).map_err(|e| AppError::Database(e.to_string()))?,
1063            max_tool_iterations: row.get(8).map_err(|e| AppError::Database(e.to_string()))?,
1064            parallel_tools: row
1065                .get::<i32>(9)
1066                .map_err(|e| AppError::Database(e.to_string()))?
1067                != 0,
1068            extra: row.get(10).map_err(|e| AppError::Database(e.to_string()))?,
1069            is_public: row
1070                .get::<i32>(11)
1071                .map_err(|e| AppError::Database(e.to_string()))?
1072                != 0,
1073            usage_count: row.get(12).map_err(|e| AppError::Database(e.to_string()))?,
1074            rating_sum: row.get(13).map_err(|e| AppError::Database(e.to_string()))?,
1075            rating_count: row.get(14).map_err(|e| AppError::Database(e.to_string()))?,
1076            created_at: row.get(15).map_err(|e| AppError::Database(e.to_string()))?,
1077            updated_at: row.get(16).map_err(|e| AppError::Database(e.to_string()))?,
1078        })
1079    }
1080
1081    // ============= Agent Execution Logging =============
1082
1083    /// Log an agent execution for analytics
1084    pub async fn log_agent_execution(&self, execution: &AgentExecution) -> Result<()> {
1085        let conn = self.operation_conn().await?;
1086
1087        // Convert Option<String> to Option<&str> for libsql compatibility
1088        let agent_id = execution.agent_id.as_deref();
1089        let output = execution.output.as_deref();
1090        let tool_calls = execution.tool_calls.as_deref();
1091        let error_message = execution.error_message.as_deref();
1092
1093        conn.execute(
1094            "INSERT INTO agent_executions (
1095                id, agent_id, agent_name, user_id, input, output, tool_calls,
1096                tokens_input, tokens_output, duration_ms, status, error_message, created_at
1097            ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13)",
1098            params![
1099                execution.id.as_str(),
1100                agent_id,
1101                execution.agent_name.as_str(),
1102                execution.user_id.as_str(),
1103                execution.input.as_str(),
1104                output,
1105                tool_calls,
1106                execution.tokens_input,
1107                execution.tokens_output,
1108                execution.duration_ms,
1109                execution.status.as_str(),
1110                error_message,
1111                execution.created_at,
1112            ],
1113        )
1114        .await
1115        .map_err(|e| AppError::Database(format!("Failed to log agent execution: {}", e)))?;
1116
1117        Ok(())
1118    }
1119
1120    /// Get execution history for a user
1121    pub async fn get_user_executions(
1122        &self,
1123        user_id: &str,
1124        limit: u32,
1125    ) -> Result<Vec<AgentExecution>> {
1126        let conn = self.operation_conn().await?;
1127
1128        let mut rows = conn
1129            .query(
1130                "SELECT id, agent_id, agent_name, user_id, input, output, tool_calls,
1131                        tokens_input, tokens_output, duration_ms, status, error_message, created_at
1132                 FROM agent_executions WHERE user_id = ?
1133                 ORDER BY created_at DESC LIMIT ?",
1134                (user_id, limit),
1135            )
1136            .await
1137            .map_err(|e| AppError::Database(format!("Failed to query executions: {}", e)))?;
1138
1139        let mut executions = Vec::new();
1140        while let Some(row) = rows
1141            .next()
1142            .await
1143            .map_err(|e| AppError::Database(e.to_string()))?
1144        {
1145            executions.push(AgentExecution {
1146                id: row.get(0).map_err(|e| AppError::Database(e.to_string()))?,
1147                agent_id: row.get(1).map_err(|e| AppError::Database(e.to_string()))?,
1148                agent_name: row.get(2).map_err(|e| AppError::Database(e.to_string()))?,
1149                user_id: row.get(3).map_err(|e| AppError::Database(e.to_string()))?,
1150                input: row.get(4).map_err(|e| AppError::Database(e.to_string()))?,
1151                output: row.get(5).map_err(|e| AppError::Database(e.to_string()))?,
1152                tool_calls: row.get(6).map_err(|e| AppError::Database(e.to_string()))?,
1153                tokens_input: row.get(7).map_err(|e| AppError::Database(e.to_string()))?,
1154                tokens_output: row.get(8).map_err(|e| AppError::Database(e.to_string()))?,
1155                duration_ms: row.get(9).map_err(|e| AppError::Database(e.to_string()))?,
1156                status: row.get(10).map_err(|e| AppError::Database(e.to_string()))?,
1157                error_message: row.get(11).map_err(|e| AppError::Database(e.to_string()))?,
1158                created_at: row.get(12).map_err(|e| AppError::Database(e.to_string()))?,
1159            });
1160        }
1161
1162        Ok(executions)
1163    }
1164}
1165
1166/// Registered user account
1167#[derive(Debug, Clone)]
1168pub struct User {
1169    /// Unique user identifier (UUID)
1170    pub id: String,
1171    /// User's email address
1172    pub email: String,
1173    /// Argon2 hashed password
1174    pub password_hash: String,
1175    /// User's display name
1176    pub name: String,
1177    /// Unix timestamp of account creation
1178    pub created_at: i64,
1179    /// Unix timestamp of last update
1180    pub updated_at: i64,
1181}
1182
1183/// User-created agent stored in the database
1184/// This structure mirrors the TOON AgentConfig format for easy import/export
1185#[derive(Debug, Clone)]
1186pub struct UserAgent {
1187    /// Unique agent identifier (UUID)
1188    pub id: String,
1189    /// ID of the user who created this agent
1190    pub user_id: String,
1191    /// Agent name (unique per user)
1192    pub name: String,
1193    /// Human-readable display name
1194    pub display_name: Option<String>,
1195    /// Agent description
1196    pub description: Option<String>,
1197    /// LLM model identifier (e.g., "gpt-4", "llama3.2")
1198    pub model: String,
1199    /// System prompt that defines agent behavior
1200    pub system_prompt: Option<String>,
1201    /// JSON array of tool names: ["calculator", "web_search"]
1202    pub tools: String,
1203    /// Maximum iterations for tool use loops
1204    pub max_tool_iterations: i32,
1205    /// Whether to execute tools in parallel
1206    pub parallel_tools: bool,
1207    /// JSON object for additional configuration
1208    pub extra: String,
1209    /// Whether agent is visible in community marketplace
1210    pub is_public: bool,
1211    /// Number of times this agent has been used
1212    pub usage_count: i32,
1213    /// Sum of all ratings received
1214    pub rating_sum: i32,
1215    /// Number of ratings received
1216    pub rating_count: i32,
1217    /// Unix timestamp of creation
1218    pub created_at: i64,
1219    /// Unix timestamp of last update
1220    pub updated_at: i64,
1221}
1222
1223impl UserAgent {
1224    /// Create a new UserAgent with required fields
1225    pub fn new(id: String, user_id: String, name: String, model: String) -> Self {
1226        let now = Utc::now().timestamp();
1227        Self {
1228            id,
1229            user_id,
1230            name,
1231            display_name: None,
1232            description: None,
1233            model,
1234            system_prompt: None,
1235            tools: "[]".to_string(),
1236            max_tool_iterations: 10,
1237            parallel_tools: false,
1238            extra: "{}".to_string(),
1239            is_public: false,
1240            usage_count: 0,
1241            rating_sum: 0,
1242            rating_count: 0,
1243            created_at: now,
1244            updated_at: now,
1245        }
1246    }
1247
1248    /// Get tools as a `Vec<String>`
1249    pub fn tools_vec(&self) -> Vec<String> {
1250        serde_json::from_str(&self.tools).unwrap_or_default()
1251    }
1252
1253    /// Set tools from a `Vec<String>`
1254    pub fn set_tools(&mut self, tools: Vec<String>) {
1255        self.tools = serde_json::to_string(&tools).unwrap_or_else(|_| "[]".to_string());
1256    }
1257
1258    /// Calculate average rating (returns None if no ratings)
1259    pub fn average_rating(&self) -> Option<f32> {
1260        if self.rating_count > 0 {
1261            Some(self.rating_sum as f32 / self.rating_count as f32)
1262        } else {
1263            None
1264        }
1265    }
1266}
1267
1268/// Agent execution log entry for analytics
1269#[derive(Debug, Clone)]
1270pub struct AgentExecution {
1271    /// Unique execution identifier (UUID)
1272    pub id: String,
1273    /// ID of user agent (None if system agent)
1274    pub agent_id: Option<String>,
1275    /// Name of the agent (always populated)
1276    pub agent_name: String,
1277    /// ID of the user who triggered this execution
1278    pub user_id: String,
1279    /// User's input message
1280    pub input: String,
1281    /// Agent's response (None if failed)
1282    pub output: Option<String>,
1283    /// JSON array of tool invocations
1284    pub tool_calls: Option<String>,
1285    /// Input token count
1286    pub tokens_input: Option<i32>,
1287    /// Output token count
1288    pub tokens_output: Option<i32>,
1289    /// Execution duration in milliseconds
1290    pub duration_ms: Option<i32>,
1291    /// Status: "success", "error", "timeout"
1292    pub status: String,
1293    /// Error message if status is "error"
1294    pub error_message: Option<String>,
1295    /// Unix timestamp of execution
1296    pub created_at: i64,
1297}
1298
1299impl AgentExecution {
1300    /// Create a new execution log entry
1301    pub fn new(agent_name: String, user_id: String, input: String) -> Self {
1302        Self {
1303            id: uuid::Uuid::new_v4().to_string(),
1304            agent_id: None,
1305            agent_name,
1306            user_id,
1307            input,
1308            output: None,
1309            tool_calls: None,
1310            tokens_input: None,
1311            tokens_output: None,
1312            duration_ms: None,
1313            status: "pending".to_string(),
1314            error_message: None,
1315            created_at: Utc::now().timestamp(),
1316        }
1317    }
1318
1319    /// Mark execution as successful
1320    pub fn success(mut self, output: String, duration_ms: i32) -> Self {
1321        self.output = Some(output);
1322        self.duration_ms = Some(duration_ms);
1323        self.status = "success".to_string();
1324        self
1325    }
1326
1327    /// Mark execution as failed
1328    pub fn error(mut self, error: String) -> Self {
1329        self.error_message = Some(error);
1330        self.status = "error".to_string();
1331        self
1332    }
1333}