ares/db/
turso.rs

1use crate::types::{AppError, MemoryFact, Message, MessageRole, Preference, Result};
2use chrono::Utc;
3use libsql::{params, Builder, Connection, Database};
4use std::sync::Arc;
5use tokio::sync::Mutex;
6
7pub struct TursoClient {
8    db: Database,
9    /// Cached connection for in-memory databases to ensure schema persists
10    cached_conn: Arc<Mutex<Option<Connection>>>,
11    is_memory: bool,
12}
13
14impl TursoClient {
15    /// Create a new TursoClient with remote Turso database
16    pub async fn new_remote(url: String, auth_token: String) -> Result<Self> {
17        let db = Builder::new_remote(url, auth_token)
18            .build()
19            .await
20            .map_err(|e| AppError::Database(format!("Failed to connect to Turso: {}", e)))?;
21
22        let client = Self {
23            db,
24            cached_conn: Arc::new(Mutex::new(None)),
25            is_memory: false,
26        };
27        client.initialize_schema().await?;
28
29        Ok(client)
30    }
31
32    /// Create a new TursoClient with local SQLite database
33    pub async fn new_local(path: &str) -> Result<Self> {
34        let is_memory = path == ":memory:";
35        let db = Builder::new_local(path)
36            .build()
37            .await
38            .map_err(|e| AppError::Database(format!("Failed to open local database: {}", e)))?;
39
40        let client = Self {
41            db,
42            cached_conn: Arc::new(Mutex::new(None)),
43            is_memory,
44        };
45
46        // For in-memory databases, we need to cache the connection
47        // so that schema persists across calls
48        if is_memory {
49            let conn = client
50                .db
51                .connect()
52                .map_err(|e| AppError::Database(format!("Failed to get connection: {}", e)))?;
53            *client.cached_conn.lock().await = Some(conn);
54        }
55
56        client.initialize_schema().await?;
57
58        Ok(client)
59    }
60
61    /// Create a new TursoClient with in-memory database (useful for testing)
62    pub async fn new_memory() -> Result<Self> {
63        Self::new_local(":memory:").await
64    }
65
66    /// Create client based on URL format - routes to local or remote
67    pub async fn new(url: String, auth_token: String) -> Result<Self> {
68        // If URL starts with "file:" or is a path, use local mode
69        if url.starts_with("file:") || url.ends_with(".db") || url == ":memory:" {
70            Self::new_local(&url).await
71        } else if url.starts_with("libsql://") || url.starts_with("https://") {
72            Self::new_remote(url, auth_token).await
73        } else {
74            // Default to local with the URL as path
75            Self::new_local(&url).await
76        }
77    }
78
79    pub fn connection(&self) -> Result<Connection> {
80        self.db
81            .connect()
82            .map_err(|e| AppError::Database(format!("Failed to get connection: {}", e)))
83    }
84
85    /// Get the connection to use for operations (handles in-memory vs file-based)
86    pub async fn operation_conn(&self) -> Result<Connection> {
87        if self.is_memory {
88            let guard = self.cached_conn.lock().await;
89            guard.as_ref().cloned().ok_or_else(|| {
90                AppError::Database("No cached connection for in-memory database".to_string())
91            })
92        } else {
93            self.connection()
94        }
95    }
96
97    async fn initialize_schema(&self) -> Result<()> {
98        let conn = self.operation_conn().await?;
99
100        // Users table
101        conn.execute(
102            "CREATE TABLE IF NOT EXISTS users (
103                id TEXT PRIMARY KEY,
104                email TEXT UNIQUE NOT NULL,
105                password_hash TEXT NOT NULL,
106                name TEXT NOT NULL,
107                created_at INTEGER NOT NULL,
108                updated_at INTEGER NOT NULL
109            )",
110            (),
111        )
112        .await
113        .map_err(|e| AppError::Database(format!("Failed to create users table: {}", e)))?;
114
115        // Sessions table
116        conn.execute(
117            "CREATE TABLE IF NOT EXISTS sessions (
118                id TEXT PRIMARY KEY,
119                user_id TEXT NOT NULL,
120                token_hash TEXT NOT NULL,
121                expires_at INTEGER NOT NULL,
122                created_at INTEGER NOT NULL,
123                FOREIGN KEY (user_id) REFERENCES users(id)
124            )",
125            (),
126        )
127        .await
128        .map_err(|e| AppError::Database(format!("Failed to create sessions table: {}", e)))?;
129
130        // Conversations table
131        conn.execute(
132            "CREATE TABLE IF NOT EXISTS conversations (
133                id TEXT PRIMARY KEY,
134                user_id TEXT NOT NULL,
135                title TEXT,
136                created_at INTEGER NOT NULL,
137                updated_at INTEGER NOT NULL,
138                FOREIGN KEY (user_id) REFERENCES users(id)
139            )",
140            (),
141        )
142        .await
143        .map_err(|e| AppError::Database(format!("Failed to create conversations table: {}", e)))?;
144
145        // Messages table
146        conn.execute(
147            "CREATE TABLE IF NOT EXISTS messages (
148                id TEXT PRIMARY KEY,
149                conversation_id TEXT NOT NULL,
150                role TEXT NOT NULL,
151                content TEXT NOT NULL,
152                timestamp INTEGER NOT NULL,
153                FOREIGN KEY (conversation_id) REFERENCES conversations(id)
154            )",
155            (),
156        )
157        .await
158        .map_err(|e| AppError::Database(format!("Failed to create messages table: {}", e)))?;
159
160        // Memory facts table
161        conn.execute(
162            "CREATE TABLE IF NOT EXISTS memory_facts (
163                id TEXT PRIMARY KEY,
164                user_id TEXT NOT NULL,
165                category TEXT NOT NULL,
166                fact_key TEXT NOT NULL,
167                fact_value TEXT NOT NULL,
168                confidence REAL NOT NULL,
169                created_at INTEGER NOT NULL,
170                updated_at INTEGER NOT NULL,
171                FOREIGN KEY (user_id) REFERENCES users(id)
172            )",
173            (),
174        )
175        .await
176        .map_err(|e| AppError::Database(format!("Failed to create memory_facts table: {}", e)))?;
177
178        // Preferences table
179        conn.execute(
180            "CREATE TABLE IF NOT EXISTS preferences (
181                id TEXT PRIMARY KEY,
182                user_id TEXT NOT NULL,
183                category TEXT NOT NULL,
184                key TEXT NOT NULL,
185                value TEXT NOT NULL,
186                confidence REAL NOT NULL,
187                created_at INTEGER NOT NULL,
188                FOREIGN KEY (user_id) REFERENCES users(id),
189                UNIQUE(user_id, category, key)
190            )",
191            (),
192        )
193        .await
194        .map_err(|e| AppError::Database(format!("Failed to create preferences table: {}", e)))?;
195
196        // User-created agents table (stores TOON-compatible agent configs)
197        conn.execute(
198            "CREATE TABLE IF NOT EXISTS user_agents (
199                id TEXT PRIMARY KEY,
200                user_id TEXT NOT NULL,
201                name TEXT NOT NULL,
202                display_name TEXT,
203                description TEXT,
204                model TEXT NOT NULL,
205                system_prompt TEXT,
206                tools TEXT DEFAULT '[]',
207                max_tool_iterations INTEGER DEFAULT 10,
208                parallel_tools INTEGER DEFAULT 0,
209                extra TEXT DEFAULT '{}',
210                is_public INTEGER DEFAULT 0,
211                usage_count INTEGER DEFAULT 0,
212                rating_sum INTEGER DEFAULT 0,
213                rating_count INTEGER DEFAULT 0,
214                created_at INTEGER NOT NULL,
215                updated_at INTEGER NOT NULL,
216                FOREIGN KEY (user_id) REFERENCES users(id),
217                UNIQUE(user_id, name)
218            )",
219            (),
220        )
221        .await
222        .map_err(|e| AppError::Database(format!("Failed to create user_agents table: {}", e)))?;
223
224        // Create index for agent lookup
225        conn.execute(
226            "CREATE INDEX IF NOT EXISTS idx_user_agents_lookup ON user_agents(user_id, name)",
227            (),
228        )
229        .await
230        .map_err(|e| {
231            AppError::Database(format!("Failed to create user_agents_lookup index: {}", e))
232        })?;
233
234        // Create index for public agent discovery
235        conn.execute(
236            "CREATE INDEX IF NOT EXISTS idx_user_agents_public ON user_agents(is_public, usage_count DESC)",
237            (),
238        )
239        .await
240        .map_err(|e| {
241            AppError::Database(format!("Failed to create user_agents_public index: {}", e))
242        })?;
243
244        // User-created tools table
245        conn.execute(
246            "CREATE TABLE IF NOT EXISTS user_tools (
247                id TEXT PRIMARY KEY,
248                user_id TEXT NOT NULL,
249                name TEXT NOT NULL,
250                display_name TEXT,
251                description TEXT,
252                enabled INTEGER DEFAULT 1,
253                timeout_secs INTEGER DEFAULT 30,
254                tool_type TEXT NOT NULL,
255                config TEXT DEFAULT '{}',
256                parameters TEXT DEFAULT '{}',
257                extra TEXT DEFAULT '{}',
258                is_public INTEGER DEFAULT 0,
259                usage_count INTEGER DEFAULT 0,
260                created_at INTEGER NOT NULL,
261                updated_at INTEGER NOT NULL,
262                FOREIGN KEY (user_id) REFERENCES users(id),
263                UNIQUE(user_id, name)
264            )",
265            (),
266        )
267        .await
268        .map_err(|e| AppError::Database(format!("Failed to create user_tools table: {}", e)))?;
269
270        // User-created MCP configurations table
271        conn.execute(
272            "CREATE TABLE IF NOT EXISTS user_mcps (
273                id TEXT PRIMARY KEY,
274                user_id TEXT NOT NULL,
275                name TEXT NOT NULL,
276                enabled INTEGER DEFAULT 1,
277                command TEXT NOT NULL,
278                args TEXT DEFAULT '[]',
279                env TEXT DEFAULT '{}',
280                timeout_secs INTEGER DEFAULT 30,
281                is_public INTEGER DEFAULT 0,
282                created_at INTEGER NOT NULL,
283                updated_at INTEGER NOT NULL,
284                FOREIGN KEY (user_id) REFERENCES users(id),
285                UNIQUE(user_id, name)
286            )",
287            (),
288        )
289        .await
290        .map_err(|e| AppError::Database(format!("Failed to create user_mcps table: {}", e)))?;
291
292        // Agent execution logs for analytics
293        conn.execute(
294            "CREATE TABLE IF NOT EXISTS agent_executions (
295                id TEXT PRIMARY KEY,
296                agent_id TEXT,
297                agent_name TEXT NOT NULL,
298                user_id TEXT NOT NULL,
299                input TEXT NOT NULL,
300                output TEXT,
301                tool_calls TEXT,
302                tokens_input INTEGER,
303                tokens_output INTEGER,
304                duration_ms INTEGER,
305                status TEXT NOT NULL,
306                error_message TEXT,
307                created_at INTEGER NOT NULL,
308                FOREIGN KEY (user_id) REFERENCES users(id)
309            )",
310            (),
311        )
312        .await
313        .map_err(|e| {
314            AppError::Database(format!("Failed to create agent_executions table: {}", e))
315        })?;
316
317        // Create indexes for execution logs
318        conn.execute(
319            "CREATE INDEX IF NOT EXISTS idx_executions_user ON agent_executions(user_id, created_at DESC)",
320            (),
321        )
322        .await
323        .map_err(|e| AppError::Database(format!("Failed to create executions_user index: {}", e)))?;
324
325        conn.execute(
326            "CREATE INDEX IF NOT EXISTS idx_executions_agent ON agent_executions(agent_name, created_at DESC)",
327            (),
328        )
329        .await
330        .map_err(|e| {
331            AppError::Database(format!("Failed to create executions_agent index: {}", e))
332        })?;
333
334        Ok(())
335    }
336
337    // User operations
338    pub async fn create_user(
339        &self,
340        id: &str,
341        email: &str,
342        password_hash: &str,
343        name: &str,
344    ) -> Result<()> {
345        let conn = self.operation_conn().await?;
346        let now = Utc::now().timestamp();
347
348        conn.execute(
349            "INSERT INTO users (id, email, password_hash, name, created_at, updated_at)
350              VALUES (?, ?, ?, ?, ?, ?)",
351            (id, email, password_hash, name, now, now),
352        )
353        .await
354        .map_err(|e| AppError::Database(format!("Failed to create user: {}", e)))?;
355
356        Ok(())
357    }
358
359    pub async fn get_user_by_email(&self, email: &str) -> Result<Option<User>> {
360        let conn = self.operation_conn().await?;
361
362        let mut rows = conn
363            .query(
364                "SELECT id, email, password_hash, name, created_at, updated_at
365                 FROM users WHERE email = ?",
366                [email],
367            )
368            .await
369            .map_err(|e| AppError::Database(format!("Failed to query user: {}", e)))?;
370
371        if let Some(row) = rows
372            .next()
373            .await
374            .map_err(|e| AppError::Database(e.to_string()))?
375        {
376            Ok(Some(User {
377                id: row.get(0).map_err(|e| AppError::Database(e.to_string()))?,
378                email: row.get(1).map_err(|e| AppError::Database(e.to_string()))?,
379                password_hash: row.get(2).map_err(|e| AppError::Database(e.to_string()))?,
380                name: row.get(3).map_err(|e| AppError::Database(e.to_string()))?,
381                created_at: row.get(4).map_err(|e| AppError::Database(e.to_string()))?,
382                updated_at: row.get(5).map_err(|e| AppError::Database(e.to_string()))?,
383            }))
384        } else {
385            Ok(None)
386        }
387    }
388
389    // Session operations
390    pub async fn create_session(
391        &self,
392        id: &str,
393        user_id: &str,
394        token_hash: &str,
395        expires_at: i64,
396    ) -> Result<()> {
397        let conn = self.operation_conn().await?;
398        let now = Utc::now().timestamp();
399
400        conn.execute(
401            "INSERT INTO sessions (id, user_id, token_hash, expires_at, created_at)
402             VALUES (?, ?, ?, ?, ?)",
403            (id, user_id, token_hash, expires_at, now),
404        )
405        .await
406        .map_err(|e| AppError::Database(format!("Failed to create session: {}", e)))?;
407
408        Ok(())
409    }
410
411    // Conversation operations
412    pub async fn create_conversation(
413        &self,
414        id: &str,
415        user_id: &str,
416        title: Option<&str>,
417    ) -> Result<()> {
418        let conn = self.operation_conn().await?;
419        let now = Utc::now().timestamp();
420
421        conn.execute(
422            "INSERT INTO conversations (id, user_id, title, created_at, updated_at)
423             VALUES (?, ?, ?, ?, ?)",
424            (id, user_id, title, now, now),
425        )
426        .await
427        .map_err(|e| AppError::Database(format!("Failed to create conversation: {}", e)))?;
428
429        Ok(())
430    }
431
432    pub async fn conversation_exists(&self, conversation_id: &str) -> Result<bool> {
433        let conn = self.operation_conn().await?;
434
435        let mut rows = conn
436            .query(
437                "SELECT 1 FROM conversations WHERE id = ?",
438                [conversation_id],
439            )
440            .await
441            .map_err(|e| AppError::Database(format!("Failed to check conversation: {}", e)))?;
442
443        Ok(rows
444            .next()
445            .await
446            .map_err(|e| AppError::Database(e.to_string()))?
447            .is_some())
448    }
449
450    pub async fn add_message(
451        &self,
452        id: &str,
453        conversation_id: &str,
454        role: MessageRole,
455        content: &str,
456    ) -> Result<()> {
457        let conn = self.operation_conn().await?;
458        let now = Utc::now().timestamp();
459        let role_str = match role {
460            MessageRole::System => "system",
461            MessageRole::User => "user",
462            MessageRole::Assistant => "assistant",
463        };
464
465        conn.execute(
466            "INSERT INTO messages (id, conversation_id, role, content, timestamp)
467             VALUES (?, ?, ?, ?, ?)",
468            (id, conversation_id, role_str, content, now),
469        )
470        .await
471        .map_err(|e| AppError::Database(format!("Failed to add message: {}", e)))?;
472
473        Ok(())
474    }
475
476    pub async fn get_conversation_history(&self, conversation_id: &str) -> Result<Vec<Message>> {
477        let conn = self.operation_conn().await?;
478
479        let mut rows = conn
480            .query(
481                "SELECT role, content, timestamp FROM messages
482                 WHERE conversation_id = ? ORDER BY timestamp ASC",
483                [conversation_id],
484            )
485            .await
486            .map_err(|e| AppError::Database(format!("Failed to query messages: {}", e)))?;
487
488        let mut messages = Vec::new();
489        while let Some(row) = rows
490            .next()
491            .await
492            .map_err(|e| AppError::Database(e.to_string()))?
493        {
494            let role_str: String = row.get(0).map_err(|e| AppError::Database(e.to_string()))?;
495            let role = match role_str.as_str() {
496                "system" => MessageRole::System,
497                "user" => MessageRole::User,
498                "assistant" => MessageRole::Assistant,
499                _ => MessageRole::User,
500            };
501
502            messages.push(Message {
503                role,
504                content: row.get(1).map_err(|e| AppError::Database(e.to_string()))?,
505                timestamp: chrono::DateTime::from_timestamp(
506                    row.get::<i64>(2)
507                        .map_err(|e| AppError::Database(e.to_string()))?,
508                    0,
509                )
510                .unwrap(),
511            });
512        }
513
514        Ok(messages)
515    }
516
517    // Memory operations
518    pub async fn store_memory_fact(&self, fact: &MemoryFact) -> Result<()> {
519        let conn = self.operation_conn().await?;
520
521        conn.execute(
522            "INSERT OR REPLACE INTO memory_facts
523            (id, user_id, category, fact_key, fact_value, confidence, created_at, updated_at)
524            VALUES (?, ?, ?, ?, ?, ?, ?, ?)",
525            (
526                fact.id.as_str(),
527                fact.user_id.as_str(),
528                fact.category.as_str(),
529                fact.fact_key.as_str(),
530                fact.fact_value.as_str(),
531                fact.confidence as f64,
532                fact.created_at.timestamp(),
533                fact.updated_at.timestamp(),
534            ),
535        )
536        .await
537        .map_err(|e| AppError::Database(format!("Failed to store memory fact: {}", e)))?;
538
539        Ok(())
540    }
541
542    pub async fn get_user_memory(&self, user_id: &str) -> Result<Vec<MemoryFact>> {
543        let conn = self.operation_conn().await?;
544
545        let mut rows = conn
546            .query(
547                "SELECT id, user_id, category, fact_key, fact_value, confidence, created_at, updated_at
548                FROM memory_facts WHERE user_id = ?",
549                [user_id],
550            )
551            .await
552            .map_err(|e| AppError::Database(format!("Failed to query memory facts: {}", e)))?;
553
554        let mut facts = Vec::new();
555        while let Some(row) = rows
556            .next()
557            .await
558            .map_err(|e| AppError::Database(e.to_string()))?
559        {
560            facts.push(MemoryFact {
561                id: row.get(0).map_err(|e| AppError::Database(e.to_string()))?,
562                user_id: row.get(1).map_err(|e| AppError::Database(e.to_string()))?,
563                category: row.get(2).map_err(|e| AppError::Database(e.to_string()))?,
564                fact_key: row.get(3).map_err(|e| AppError::Database(e.to_string()))?,
565                fact_value: row.get(4).map_err(|e| AppError::Database(e.to_string()))?,
566                confidence: row
567                    .get::<f64>(5)
568                    .map_err(|e| AppError::Database(e.to_string()))?
569                    as f32,
570                created_at: chrono::DateTime::from_timestamp(
571                    row.get::<i64>(6)
572                        .map_err(|e| AppError::Database(e.to_string()))?,
573                    0,
574                )
575                .unwrap(),
576                updated_at: chrono::DateTime::from_timestamp(
577                    row.get::<i64>(7)
578                        .map_err(|e| AppError::Database(e.to_string()))?,
579                    0,
580                )
581                .unwrap(),
582            });
583        }
584
585        Ok(facts)
586    }
587
588    pub async fn store_preference(&self, user_id: &str, preference: &Preference) -> Result<()> {
589        let conn = self.operation_conn().await?;
590        let now = Utc::now().timestamp();
591        let id = uuid::Uuid::new_v4().to_string();
592
593        conn.execute(
594            "INSERT OR REPLACE INTO preferences
595             (id, user_id, category, key, value, confidence, created_at)
596             VALUES (?, ?, ?, ?, ?, ?, ?)",
597            (
598                id,
599                user_id,
600                preference.category.as_str(),
601                preference.key.as_str(),
602                preference.value.as_str(),
603                preference.confidence as f64,
604                now,
605            ),
606        )
607        .await
608        .map_err(|e| AppError::Database(format!("Failed to store preference: {}", e)))?;
609
610        Ok(())
611    }
612
613    pub async fn get_user_preferences(&self, user_id: &str) -> Result<Vec<Preference>> {
614        let conn = self.operation_conn().await?;
615
616        let mut rows = conn
617            .query(
618                "SELECT category, key, value, confidence FROM preferences WHERE user_id = ?",
619                [user_id],
620            )
621            .await
622            .map_err(|e| AppError::Database(format!("Failed to query preferences: {}", e)))?;
623
624        let mut preferences = Vec::new();
625        while let Some(row) = rows
626            .next()
627            .await
628            .map_err(|e| AppError::Database(e.to_string()))?
629        {
630            preferences.push(Preference {
631                category: row.get(0).map_err(|e| AppError::Database(e.to_string()))?,
632                key: row.get(1).map_err(|e| AppError::Database(e.to_string()))?,
633                value: row.get(2).map_err(|e| AppError::Database(e.to_string()))?,
634                confidence: row
635                    .get::<f64>(3)
636                    .map_err(|e| AppError::Database(e.to_string()))?
637                    as f32,
638            });
639        }
640
641        Ok(preferences)
642    }
643
644    // ============= User Agent Operations =============
645
646    /// Create a new user-defined agent
647    pub async fn create_user_agent(&self, agent: &UserAgent) -> Result<()> {
648        let conn = self.operation_conn().await?;
649
650        // Convert Option<String> to Option<&str> for libsql compatibility
651        let display_name = agent.display_name.as_deref();
652        let description = agent.description.as_deref();
653        let system_prompt = agent.system_prompt.as_deref();
654
655        conn.execute(
656            "INSERT INTO user_agents (
657                id, user_id, name, display_name, description, model, system_prompt,
658                tools, max_tool_iterations, parallel_tools, extra, is_public,
659                usage_count, rating_sum, rating_count, created_at, updated_at
660            ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13, ?14, ?15, ?16, ?17)",
661            params![
662                agent.id.as_str(),
663                agent.user_id.as_str(),
664                agent.name.as_str(),
665                display_name,
666                description,
667                agent.model.as_str(),
668                system_prompt,
669                agent.tools.as_str(),
670                agent.max_tool_iterations,
671                agent.parallel_tools as i32,
672                agent.extra.as_str(),
673                agent.is_public as i32,
674                agent.usage_count,
675                agent.rating_sum,
676                agent.rating_count,
677                agent.created_at,
678                agent.updated_at,
679            ],
680        )
681        .await
682        .map_err(|e| AppError::Database(format!("Failed to create user agent: {}", e)))?;
683
684        Ok(())
685    }
686
687    /// Get a user agent by ID
688    pub async fn get_user_agent(&self, id: &str) -> Result<Option<UserAgent>> {
689        let conn = self.operation_conn().await?;
690
691        let mut rows = conn
692            .query(
693                "SELECT id, user_id, name, display_name, description, model, system_prompt,
694                        tools, max_tool_iterations, parallel_tools, extra, is_public,
695                        usage_count, rating_sum, rating_count, created_at, updated_at
696                 FROM user_agents WHERE id = ?",
697                [id],
698            )
699            .await
700            .map_err(|e| AppError::Database(format!("Failed to query user agent: {}", e)))?;
701
702        if let Some(row) = rows
703            .next()
704            .await
705            .map_err(|e| AppError::Database(e.to_string()))?
706        {
707            Ok(Some(Self::row_to_user_agent(&row)?))
708        } else {
709            Ok(None)
710        }
711    }
712
713    /// Get a user agent by user_id and name
714    pub async fn get_user_agent_by_name(
715        &self,
716        user_id: &str,
717        name: &str,
718    ) -> Result<Option<UserAgent>> {
719        let conn = self.operation_conn().await?;
720
721        let mut rows = conn
722            .query(
723                "SELECT id, user_id, name, display_name, description, model, system_prompt,
724                        tools, max_tool_iterations, parallel_tools, extra, is_public,
725                        usage_count, rating_sum, rating_count, created_at, updated_at
726                 FROM user_agents WHERE user_id = ? AND name = ?",
727                (user_id, name),
728            )
729            .await
730            .map_err(|e| AppError::Database(format!("Failed to query user agent: {}", e)))?;
731
732        if let Some(row) = rows
733            .next()
734            .await
735            .map_err(|e| AppError::Database(e.to_string()))?
736        {
737            Ok(Some(Self::row_to_user_agent(&row)?))
738        } else {
739            Ok(None)
740        }
741    }
742
743    /// Get a public agent by name (for community discovery)
744    pub async fn get_public_agent_by_name(&self, name: &str) -> Result<Option<UserAgent>> {
745        let conn = self.operation_conn().await?;
746
747        let mut rows = conn
748            .query(
749                "SELECT id, user_id, name, display_name, description, model, system_prompt,
750                        tools, max_tool_iterations, parallel_tools, extra, is_public,
751                        usage_count, rating_sum, rating_count, created_at, updated_at
752                 FROM user_agents WHERE name = ? AND is_public = 1
753                 ORDER BY usage_count DESC LIMIT 1",
754                [name],
755            )
756            .await
757            .map_err(|e| AppError::Database(format!("Failed to query public agent: {}", e)))?;
758
759        if let Some(row) = rows
760            .next()
761            .await
762            .map_err(|e| AppError::Database(e.to_string()))?
763        {
764            Ok(Some(Self::row_to_user_agent(&row)?))
765        } else {
766            Ok(None)
767        }
768    }
769
770    /// List all agents for a user
771    pub async fn list_user_agents(&self, user_id: &str) -> Result<Vec<UserAgent>> {
772        let conn = self.operation_conn().await?;
773
774        let mut rows = conn
775            .query(
776                "SELECT id, user_id, name, display_name, description, model, system_prompt,
777                        tools, max_tool_iterations, parallel_tools, extra, is_public,
778                        usage_count, rating_sum, rating_count, created_at, updated_at
779                 FROM user_agents WHERE user_id = ? ORDER BY updated_at DESC",
780                [user_id],
781            )
782            .await
783            .map_err(|e| AppError::Database(format!("Failed to list user agents: {}", e)))?;
784
785        let mut agents = Vec::new();
786        while let Some(row) = rows
787            .next()
788            .await
789            .map_err(|e| AppError::Database(e.to_string()))?
790        {
791            agents.push(Self::row_to_user_agent(&row)?);
792        }
793
794        Ok(agents)
795    }
796
797    /// List public agents (community/marketplace)
798    pub async fn list_public_agents(&self, limit: u32, offset: u32) -> Result<Vec<UserAgent>> {
799        let conn = self.operation_conn().await?;
800
801        let mut rows = conn
802            .query(
803                "SELECT id, user_id, name, display_name, description, model, system_prompt,
804                        tools, max_tool_iterations, parallel_tools, extra, is_public,
805                        usage_count, rating_sum, rating_count, created_at, updated_at
806                 FROM user_agents WHERE is_public = 1
807                 ORDER BY usage_count DESC LIMIT ? OFFSET ?",
808                (limit, offset),
809            )
810            .await
811            .map_err(|e| AppError::Database(format!("Failed to list public agents: {}", e)))?;
812
813        let mut agents = Vec::new();
814        while let Some(row) = rows
815            .next()
816            .await
817            .map_err(|e| AppError::Database(e.to_string()))?
818        {
819            agents.push(Self::row_to_user_agent(&row)?);
820        }
821
822        Ok(agents)
823    }
824
825    /// Update a user agent
826    pub async fn update_user_agent(&self, agent: &UserAgent) -> Result<()> {
827        let conn = self.operation_conn().await?;
828
829        // Convert Option<String> to Option<&str> for libsql compatibility
830        let display_name = agent.display_name.as_deref();
831        let description = agent.description.as_deref();
832        let system_prompt = agent.system_prompt.as_deref();
833
834        conn.execute(
835            "UPDATE user_agents SET
836                display_name = ?1, description = ?2, model = ?3, system_prompt = ?4,
837                tools = ?5, max_tool_iterations = ?6, parallel_tools = ?7, extra = ?8,
838                is_public = ?9, updated_at = ?10
839             WHERE id = ?11 AND user_id = ?12",
840            params![
841                display_name,
842                description,
843                agent.model.as_str(),
844                system_prompt,
845                agent.tools.as_str(),
846                agent.max_tool_iterations,
847                agent.parallel_tools as i32,
848                agent.extra.as_str(),
849                agent.is_public as i32,
850                agent.updated_at,
851                agent.id.as_str(),
852                agent.user_id.as_str(),
853            ],
854        )
855        .await
856        .map_err(|e| AppError::Database(format!("Failed to update user agent: {}", e)))?;
857
858        Ok(())
859    }
860
861    /// Delete a user agent
862    pub async fn delete_user_agent(&self, id: &str, user_id: &str) -> Result<bool> {
863        let conn = self.operation_conn().await?;
864
865        let affected = conn
866            .execute(
867                "DELETE FROM user_agents WHERE id = ? AND user_id = ?",
868                (id, user_id),
869            )
870            .await
871            .map_err(|e| AppError::Database(format!("Failed to delete user agent: {}", e)))?;
872
873        Ok(affected > 0)
874    }
875
876    /// Increment usage count for an agent
877    pub async fn increment_agent_usage(&self, id: &str) -> Result<()> {
878        let conn = self.operation_conn().await?;
879
880        conn.execute(
881            "UPDATE user_agents SET usage_count = usage_count + 1 WHERE id = ?",
882            [id],
883        )
884        .await
885        .map_err(|e| AppError::Database(format!("Failed to increment agent usage: {}", e)))?;
886
887        Ok(())
888    }
889
890    /// Helper to convert a database row to UserAgent
891    fn row_to_user_agent(row: &libsql::Row) -> Result<UserAgent> {
892        Ok(UserAgent {
893            id: row.get(0).map_err(|e| AppError::Database(e.to_string()))?,
894            user_id: row.get(1).map_err(|e| AppError::Database(e.to_string()))?,
895            name: row.get(2).map_err(|e| AppError::Database(e.to_string()))?,
896            display_name: row.get(3).map_err(|e| AppError::Database(e.to_string()))?,
897            description: row.get(4).map_err(|e| AppError::Database(e.to_string()))?,
898            model: row.get(5).map_err(|e| AppError::Database(e.to_string()))?,
899            system_prompt: row.get(6).map_err(|e| AppError::Database(e.to_string()))?,
900            tools: row.get(7).map_err(|e| AppError::Database(e.to_string()))?,
901            max_tool_iterations: row.get(8).map_err(|e| AppError::Database(e.to_string()))?,
902            parallel_tools: row
903                .get::<i32>(9)
904                .map_err(|e| AppError::Database(e.to_string()))?
905                != 0,
906            extra: row.get(10).map_err(|e| AppError::Database(e.to_string()))?,
907            is_public: row
908                .get::<i32>(11)
909                .map_err(|e| AppError::Database(e.to_string()))?
910                != 0,
911            usage_count: row.get(12).map_err(|e| AppError::Database(e.to_string()))?,
912            rating_sum: row.get(13).map_err(|e| AppError::Database(e.to_string()))?,
913            rating_count: row.get(14).map_err(|e| AppError::Database(e.to_string()))?,
914            created_at: row.get(15).map_err(|e| AppError::Database(e.to_string()))?,
915            updated_at: row.get(16).map_err(|e| AppError::Database(e.to_string()))?,
916        })
917    }
918
919    // ============= Agent Execution Logging =============
920
921    /// Log an agent execution for analytics
922    pub async fn log_agent_execution(&self, execution: &AgentExecution) -> Result<()> {
923        let conn = self.operation_conn().await?;
924
925        // Convert Option<String> to Option<&str> for libsql compatibility
926        let agent_id = execution.agent_id.as_deref();
927        let output = execution.output.as_deref();
928        let tool_calls = execution.tool_calls.as_deref();
929        let error_message = execution.error_message.as_deref();
930
931        conn.execute(
932            "INSERT INTO agent_executions (
933                id, agent_id, agent_name, user_id, input, output, tool_calls,
934                tokens_input, tokens_output, duration_ms, status, error_message, created_at
935            ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13)",
936            params![
937                execution.id.as_str(),
938                agent_id,
939                execution.agent_name.as_str(),
940                execution.user_id.as_str(),
941                execution.input.as_str(),
942                output,
943                tool_calls,
944                execution.tokens_input,
945                execution.tokens_output,
946                execution.duration_ms,
947                execution.status.as_str(),
948                error_message,
949                execution.created_at,
950            ],
951        )
952        .await
953        .map_err(|e| AppError::Database(format!("Failed to log agent execution: {}", e)))?;
954
955        Ok(())
956    }
957
958    /// Get execution history for a user
959    pub async fn get_user_executions(
960        &self,
961        user_id: &str,
962        limit: u32,
963    ) -> Result<Vec<AgentExecution>> {
964        let conn = self.operation_conn().await?;
965
966        let mut rows = conn
967            .query(
968                "SELECT id, agent_id, agent_name, user_id, input, output, tool_calls,
969                        tokens_input, tokens_output, duration_ms, status, error_message, created_at
970                 FROM agent_executions WHERE user_id = ?
971                 ORDER BY created_at DESC LIMIT ?",
972                (user_id, limit),
973            )
974            .await
975            .map_err(|e| AppError::Database(format!("Failed to query executions: {}", e)))?;
976
977        let mut executions = Vec::new();
978        while let Some(row) = rows
979            .next()
980            .await
981            .map_err(|e| AppError::Database(e.to_string()))?
982        {
983            executions.push(AgentExecution {
984                id: row.get(0).map_err(|e| AppError::Database(e.to_string()))?,
985                agent_id: row.get(1).map_err(|e| AppError::Database(e.to_string()))?,
986                agent_name: row.get(2).map_err(|e| AppError::Database(e.to_string()))?,
987                user_id: row.get(3).map_err(|e| AppError::Database(e.to_string()))?,
988                input: row.get(4).map_err(|e| AppError::Database(e.to_string()))?,
989                output: row.get(5).map_err(|e| AppError::Database(e.to_string()))?,
990                tool_calls: row.get(6).map_err(|e| AppError::Database(e.to_string()))?,
991                tokens_input: row.get(7).map_err(|e| AppError::Database(e.to_string()))?,
992                tokens_output: row.get(8).map_err(|e| AppError::Database(e.to_string()))?,
993                duration_ms: row.get(9).map_err(|e| AppError::Database(e.to_string()))?,
994                status: row.get(10).map_err(|e| AppError::Database(e.to_string()))?,
995                error_message: row.get(11).map_err(|e| AppError::Database(e.to_string()))?,
996                created_at: row.get(12).map_err(|e| AppError::Database(e.to_string()))?,
997            });
998        }
999
1000        Ok(executions)
1001    }
1002}
1003
1004#[derive(Debug, Clone)]
1005pub struct User {
1006    pub id: String,
1007    pub email: String,
1008    pub password_hash: String,
1009    pub name: String,
1010    pub created_at: i64,
1011    pub updated_at: i64,
1012}
1013
1014/// User-created agent stored in the database
1015/// This structure mirrors the TOON AgentConfig format for easy import/export
1016#[derive(Debug, Clone)]
1017pub struct UserAgent {
1018    pub id: String,
1019    pub user_id: String,
1020    pub name: String,
1021    pub display_name: Option<String>,
1022    pub description: Option<String>,
1023    pub model: String,
1024    pub system_prompt: Option<String>,
1025    /// JSON array of tool names: ["calculator", "web_search"]
1026    pub tools: String,
1027    pub max_tool_iterations: i32,
1028    pub parallel_tools: bool,
1029    /// JSON object for additional configuration
1030    pub extra: String,
1031    pub is_public: bool,
1032    pub usage_count: i32,
1033    pub rating_sum: i32,
1034    pub rating_count: i32,
1035    pub created_at: i64,
1036    pub updated_at: i64,
1037}
1038
1039impl UserAgent {
1040    /// Create a new UserAgent with required fields
1041    pub fn new(id: String, user_id: String, name: String, model: String) -> Self {
1042        let now = Utc::now().timestamp();
1043        Self {
1044            id,
1045            user_id,
1046            name,
1047            display_name: None,
1048            description: None,
1049            model,
1050            system_prompt: None,
1051            tools: "[]".to_string(),
1052            max_tool_iterations: 10,
1053            parallel_tools: false,
1054            extra: "{}".to_string(),
1055            is_public: false,
1056            usage_count: 0,
1057            rating_sum: 0,
1058            rating_count: 0,
1059            created_at: now,
1060            updated_at: now,
1061        }
1062    }
1063
1064    /// Get tools as a Vec<String>
1065    pub fn tools_vec(&self) -> Vec<String> {
1066        serde_json::from_str(&self.tools).unwrap_or_default()
1067    }
1068
1069    /// Set tools from a Vec<String>
1070    pub fn set_tools(&mut self, tools: Vec<String>) {
1071        self.tools = serde_json::to_string(&tools).unwrap_or_else(|_| "[]".to_string());
1072    }
1073
1074    /// Calculate average rating (returns None if no ratings)
1075    pub fn average_rating(&self) -> Option<f32> {
1076        if self.rating_count > 0 {
1077            Some(self.rating_sum as f32 / self.rating_count as f32)
1078        } else {
1079            None
1080        }
1081    }
1082}
1083
1084/// Agent execution log entry for analytics
1085#[derive(Debug, Clone)]
1086pub struct AgentExecution {
1087    pub id: String,
1088    /// ID of user agent (None if system agent)
1089    pub agent_id: Option<String>,
1090    /// Name of the agent (always populated)
1091    pub agent_name: String,
1092    pub user_id: String,
1093    pub input: String,
1094    pub output: Option<String>,
1095    /// JSON array of tool invocations
1096    pub tool_calls: Option<String>,
1097    pub tokens_input: Option<i32>,
1098    pub tokens_output: Option<i32>,
1099    pub duration_ms: Option<i32>,
1100    /// Status: "success", "error", "timeout"
1101    pub status: String,
1102    pub error_message: Option<String>,
1103    pub created_at: i64,
1104}
1105
1106impl AgentExecution {
1107    /// Create a new execution log entry
1108    pub fn new(agent_name: String, user_id: String, input: String) -> Self {
1109        Self {
1110            id: uuid::Uuid::new_v4().to_string(),
1111            agent_id: None,
1112            agent_name,
1113            user_id,
1114            input,
1115            output: None,
1116            tool_calls: None,
1117            tokens_input: None,
1118            tokens_output: None,
1119            duration_ms: None,
1120            status: "pending".to_string(),
1121            error_message: None,
1122            created_at: Utc::now().timestamp(),
1123        }
1124    }
1125
1126    /// Mark execution as successful
1127    pub fn success(mut self, output: String, duration_ms: i32) -> Self {
1128        self.output = Some(output);
1129        self.duration_ms = Some(duration_ms);
1130        self.status = "success".to_string();
1131        self
1132    }
1133
1134    /// Mark execution as failed
1135    pub fn error(mut self, error: String) -> Self {
1136        self.error_message = Some(error);
1137        self.status = "error".to_string();
1138        self
1139    }
1140}