1use crate::types::{AppError, MemoryFact, Message, MessageRole, Preference, Result};
2use chrono::Utc;
3use libsql::{params, Builder, Connection, Database};
4use std::sync::Arc;
5use tokio::sync::Mutex;
6
7#[derive(Debug, Clone)]
9pub struct Conversation {
10 pub id: String,
12 pub user_id: String,
14 pub title: Option<String>,
16 pub message_count: i32,
18 pub created_at: String,
20 pub updated_at: String,
22}
23
24pub struct TursoClient {
29 db: Database,
30 cached_conn: Arc<Mutex<Option<Connection>>>,
32 is_memory: bool,
33}
34
35impl TursoClient {
36 pub async fn new_remote(url: String, auth_token: String) -> Result<Self> {
38 let db = Builder::new_remote(url, auth_token)
39 .build()
40 .await
41 .map_err(|e| AppError::Database(format!("Failed to connect to Turso: {}", e)))?;
42
43 let client = Self {
44 db,
45 cached_conn: Arc::new(Mutex::new(None)),
46 is_memory: false,
47 };
48 client.initialize_schema().await?;
49
50 Ok(client)
51 }
52
53 pub async fn new_local(path: &str) -> Result<Self> {
55 let is_memory = path == ":memory:";
56 let db = Builder::new_local(path)
57 .build()
58 .await
59 .map_err(|e| AppError::Database(format!("Failed to open local database: {}", e)))?;
60
61 let client = Self {
62 db,
63 cached_conn: Arc::new(Mutex::new(None)),
64 is_memory,
65 };
66
67 if is_memory {
70 let conn = client
71 .db
72 .connect()
73 .map_err(|e| AppError::Database(format!("Failed to get connection: {}", e)))?;
74 *client.cached_conn.lock().await = Some(conn);
75 }
76
77 client.initialize_schema().await?;
78
79 Ok(client)
80 }
81
82 pub async fn new_memory() -> Result<Self> {
84 Self::new_local(":memory:").await
85 }
86
87 pub async fn new(url: String, auth_token: String) -> Result<Self> {
89 if url.starts_with("file:") || url.ends_with(".db") || url == ":memory:" {
91 Self::new_local(&url).await
92 } else if url.starts_with("libsql://") || url.starts_with("https://") {
93 Self::new_remote(url, auth_token).await
94 } else {
95 Self::new_local(&url).await
97 }
98 }
99
100 pub fn connection(&self) -> Result<Connection> {
102 self.db
103 .connect()
104 .map_err(|e| AppError::Database(format!("Failed to get connection: {}", e)))
105 }
106
107 pub async fn operation_conn(&self) -> Result<Connection> {
109 if self.is_memory {
110 let guard = self.cached_conn.lock().await;
111 guard.as_ref().cloned().ok_or_else(|| {
112 AppError::Database("No cached connection for in-memory database".to_string())
113 })
114 } else {
115 self.connection()
116 }
117 }
118
119 async fn initialize_schema(&self) -> Result<()> {
120 let conn = self.operation_conn().await?;
121
122 conn.execute(
124 "CREATE TABLE IF NOT EXISTS users (
125 id TEXT PRIMARY KEY,
126 email TEXT UNIQUE NOT NULL,
127 password_hash TEXT NOT NULL,
128 name TEXT NOT NULL,
129 created_at INTEGER NOT NULL,
130 updated_at INTEGER NOT NULL
131 )",
132 (),
133 )
134 .await
135 .map_err(|e| AppError::Database(format!("Failed to create users table: {}", e)))?;
136
137 conn.execute(
139 "CREATE TABLE IF NOT EXISTS sessions (
140 id TEXT PRIMARY KEY,
141 user_id TEXT NOT NULL,
142 token_hash TEXT NOT NULL,
143 expires_at INTEGER NOT NULL,
144 created_at INTEGER NOT NULL,
145 FOREIGN KEY (user_id) REFERENCES users(id)
146 )",
147 (),
148 )
149 .await
150 .map_err(|e| AppError::Database(format!("Failed to create sessions table: {}", e)))?;
151
152 conn.execute(
154 "CREATE TABLE IF NOT EXISTS conversations (
155 id TEXT PRIMARY KEY,
156 user_id TEXT NOT NULL,
157 title TEXT,
158 created_at INTEGER NOT NULL,
159 updated_at INTEGER NOT NULL,
160 FOREIGN KEY (user_id) REFERENCES users(id)
161 )",
162 (),
163 )
164 .await
165 .map_err(|e| AppError::Database(format!("Failed to create conversations table: {}", e)))?;
166
167 conn.execute(
169 "CREATE TABLE IF NOT EXISTS messages (
170 id TEXT PRIMARY KEY,
171 conversation_id TEXT NOT NULL,
172 role TEXT NOT NULL,
173 content TEXT NOT NULL,
174 timestamp INTEGER NOT NULL,
175 FOREIGN KEY (conversation_id) REFERENCES conversations(id)
176 )",
177 (),
178 )
179 .await
180 .map_err(|e| AppError::Database(format!("Failed to create messages table: {}", e)))?;
181
182 conn.execute(
184 "CREATE TABLE IF NOT EXISTS memory_facts (
185 id TEXT PRIMARY KEY,
186 user_id TEXT NOT NULL,
187 category TEXT NOT NULL,
188 fact_key TEXT NOT NULL,
189 fact_value TEXT NOT NULL,
190 confidence REAL NOT NULL,
191 created_at INTEGER NOT NULL,
192 updated_at INTEGER NOT NULL,
193 FOREIGN KEY (user_id) REFERENCES users(id)
194 )",
195 (),
196 )
197 .await
198 .map_err(|e| AppError::Database(format!("Failed to create memory_facts table: {}", e)))?;
199
200 conn.execute(
202 "CREATE TABLE IF NOT EXISTS preferences (
203 id TEXT PRIMARY KEY,
204 user_id TEXT NOT NULL,
205 category TEXT NOT NULL,
206 key TEXT NOT NULL,
207 value TEXT NOT NULL,
208 confidence REAL NOT NULL,
209 created_at INTEGER NOT NULL,
210 FOREIGN KEY (user_id) REFERENCES users(id),
211 UNIQUE(user_id, category, key)
212 )",
213 (),
214 )
215 .await
216 .map_err(|e| AppError::Database(format!("Failed to create preferences table: {}", e)))?;
217
218 conn.execute(
220 "CREATE TABLE IF NOT EXISTS user_agents (
221 id TEXT PRIMARY KEY,
222 user_id TEXT NOT NULL,
223 name TEXT NOT NULL,
224 display_name TEXT,
225 description TEXT,
226 model TEXT NOT NULL,
227 system_prompt TEXT,
228 tools TEXT DEFAULT '[]',
229 max_tool_iterations INTEGER DEFAULT 10,
230 parallel_tools INTEGER DEFAULT 0,
231 extra TEXT DEFAULT '{}',
232 is_public INTEGER DEFAULT 0,
233 usage_count INTEGER DEFAULT 0,
234 rating_sum INTEGER DEFAULT 0,
235 rating_count INTEGER DEFAULT 0,
236 created_at INTEGER NOT NULL,
237 updated_at INTEGER NOT NULL,
238 FOREIGN KEY (user_id) REFERENCES users(id),
239 UNIQUE(user_id, name)
240 )",
241 (),
242 )
243 .await
244 .map_err(|e| AppError::Database(format!("Failed to create user_agents table: {}", e)))?;
245
246 conn.execute(
248 "CREATE INDEX IF NOT EXISTS idx_user_agents_lookup ON user_agents(user_id, name)",
249 (),
250 )
251 .await
252 .map_err(|e| {
253 AppError::Database(format!("Failed to create user_agents_lookup index: {}", e))
254 })?;
255
256 conn.execute(
258 "CREATE INDEX IF NOT EXISTS idx_user_agents_public ON user_agents(is_public, usage_count DESC)",
259 (),
260 )
261 .await
262 .map_err(|e| {
263 AppError::Database(format!("Failed to create user_agents_public index: {}", e))
264 })?;
265
266 conn.execute(
268 "CREATE TABLE IF NOT EXISTS user_tools (
269 id TEXT PRIMARY KEY,
270 user_id TEXT NOT NULL,
271 name TEXT NOT NULL,
272 display_name TEXT,
273 description TEXT,
274 enabled INTEGER DEFAULT 1,
275 timeout_secs INTEGER DEFAULT 30,
276 tool_type TEXT NOT NULL,
277 config TEXT DEFAULT '{}',
278 parameters TEXT DEFAULT '{}',
279 extra TEXT DEFAULT '{}',
280 is_public INTEGER DEFAULT 0,
281 usage_count INTEGER DEFAULT 0,
282 created_at INTEGER NOT NULL,
283 updated_at INTEGER NOT NULL,
284 FOREIGN KEY (user_id) REFERENCES users(id),
285 UNIQUE(user_id, name)
286 )",
287 (),
288 )
289 .await
290 .map_err(|e| AppError::Database(format!("Failed to create user_tools table: {}", e)))?;
291
292 conn.execute(
294 "CREATE TABLE IF NOT EXISTS user_mcps (
295 id TEXT PRIMARY KEY,
296 user_id TEXT NOT NULL,
297 name TEXT NOT NULL,
298 enabled INTEGER DEFAULT 1,
299 command TEXT NOT NULL,
300 args TEXT DEFAULT '[]',
301 env TEXT DEFAULT '{}',
302 timeout_secs INTEGER DEFAULT 30,
303 is_public INTEGER DEFAULT 0,
304 created_at INTEGER NOT NULL,
305 updated_at INTEGER NOT NULL,
306 FOREIGN KEY (user_id) REFERENCES users(id),
307 UNIQUE(user_id, name)
308 )",
309 (),
310 )
311 .await
312 .map_err(|e| AppError::Database(format!("Failed to create user_mcps table: {}", e)))?;
313
314 conn.execute(
316 "CREATE TABLE IF NOT EXISTS agent_executions (
317 id TEXT PRIMARY KEY,
318 agent_id TEXT,
319 agent_name TEXT NOT NULL,
320 user_id TEXT NOT NULL,
321 input TEXT NOT NULL,
322 output TEXT,
323 tool_calls TEXT,
324 tokens_input INTEGER,
325 tokens_output INTEGER,
326 duration_ms INTEGER,
327 status TEXT NOT NULL,
328 error_message TEXT,
329 created_at INTEGER NOT NULL,
330 FOREIGN KEY (user_id) REFERENCES users(id)
331 )",
332 (),
333 )
334 .await
335 .map_err(|e| {
336 AppError::Database(format!("Failed to create agent_executions table: {}", e))
337 })?;
338
339 conn.execute(
341 "CREATE INDEX IF NOT EXISTS idx_executions_user ON agent_executions(user_id, created_at DESC)",
342 (),
343 )
344 .await
345 .map_err(|e| AppError::Database(format!("Failed to create executions_user index: {}", e)))?;
346
347 conn.execute(
348 "CREATE INDEX IF NOT EXISTS idx_executions_agent ON agent_executions(agent_name, created_at DESC)",
349 (),
350 )
351 .await
352 .map_err(|e| {
353 AppError::Database(format!("Failed to create executions_agent index: {}", e))
354 })?;
355
356 Ok(())
357 }
358
359 pub async fn create_user(
367 &self,
368 id: &str,
369 email: &str,
370 password_hash: &str,
371 name: &str,
372 ) -> Result<()> {
373 let conn = self.operation_conn().await?;
374 let now = Utc::now().timestamp();
375
376 conn.execute(
377 "INSERT INTO users (id, email, password_hash, name, created_at, updated_at)
378 VALUES (?, ?, ?, ?, ?, ?)",
379 (id, email, password_hash, name, now, now),
380 )
381 .await
382 .map_err(|e| AppError::Database(format!("Failed to create user: {}", e)))?;
383
384 Ok(())
385 }
386
387 pub async fn get_user_by_email(&self, email: &str) -> Result<Option<User>> {
389 let conn = self.operation_conn().await?;
390
391 let mut rows = conn
392 .query(
393 "SELECT id, email, password_hash, name, created_at, updated_at
394 FROM users WHERE email = ?",
395 [email],
396 )
397 .await
398 .map_err(|e| AppError::Database(format!("Failed to query user: {}", e)))?;
399
400 if let Some(row) = rows
401 .next()
402 .await
403 .map_err(|e| AppError::Database(e.to_string()))?
404 {
405 Ok(Some(User {
406 id: row.get(0).map_err(|e| AppError::Database(e.to_string()))?,
407 email: row.get(1).map_err(|e| AppError::Database(e.to_string()))?,
408 password_hash: row.get(2).map_err(|e| AppError::Database(e.to_string()))?,
409 name: row.get(3).map_err(|e| AppError::Database(e.to_string()))?,
410 created_at: row.get(4).map_err(|e| AppError::Database(e.to_string()))?,
411 updated_at: row.get(5).map_err(|e| AppError::Database(e.to_string()))?,
412 }))
413 } else {
414 Ok(None)
415 }
416 }
417
418 pub async fn create_session(
426 &self,
427 id: &str,
428 user_id: &str,
429 token_hash: &str,
430 expires_at: i64,
431 ) -> Result<()> {
432 let conn = self.operation_conn().await?;
433 let now = Utc::now().timestamp();
434
435 conn.execute(
436 "INSERT INTO sessions (id, user_id, token_hash, expires_at, created_at)
437 VALUES (?, ?, ?, ?, ?)",
438 (id, user_id, token_hash, expires_at, now),
439 )
440 .await
441 .map_err(|e| AppError::Database(format!("Failed to create session: {}", e)))?;
442
443 Ok(())
444 }
445
446 pub async fn create_conversation(
448 &self,
449 id: &str,
450 user_id: &str,
451 title: Option<&str>,
452 ) -> Result<()> {
453 let conn = self.operation_conn().await?;
454 let now = Utc::now().timestamp();
455
456 conn.execute(
457 "INSERT INTO conversations (id, user_id, title, created_at, updated_at)
458 VALUES (?, ?, ?, ?, ?)",
459 (id, user_id, title, now, now),
460 )
461 .await
462 .map_err(|e| AppError::Database(format!("Failed to create conversation: {}", e)))?;
463
464 Ok(())
465 }
466
467 pub async fn conversation_exists(&self, conversation_id: &str) -> Result<bool> {
469 let conn = self.operation_conn().await?;
470
471 let mut rows = conn
472 .query(
473 "SELECT 1 FROM conversations WHERE id = ?",
474 [conversation_id],
475 )
476 .await
477 .map_err(|e| AppError::Database(format!("Failed to check conversation: {}", e)))?;
478
479 Ok(rows
480 .next()
481 .await
482 .map_err(|e| AppError::Database(e.to_string()))?
483 .is_some())
484 }
485
486 pub async fn add_message(
488 &self,
489 id: &str,
490 conversation_id: &str,
491 role: MessageRole,
492 content: &str,
493 ) -> Result<()> {
494 let conn = self.operation_conn().await?;
495 let now = Utc::now().timestamp();
496 let role_str = match role {
497 MessageRole::System => "system",
498 MessageRole::User => "user",
499 MessageRole::Assistant => "assistant",
500 };
501
502 conn.execute(
503 "INSERT INTO messages (id, conversation_id, role, content, timestamp)
504 VALUES (?, ?, ?, ?, ?)",
505 (id, conversation_id, role_str, content, now),
506 )
507 .await
508 .map_err(|e| AppError::Database(format!("Failed to add message: {}", e)))?;
509
510 Ok(())
511 }
512
513 pub async fn get_conversation_history(&self, conversation_id: &str) -> Result<Vec<Message>> {
515 let conn = self.operation_conn().await?;
516
517 let mut rows = conn
518 .query(
519 "SELECT role, content, timestamp FROM messages
520 WHERE conversation_id = ? ORDER BY timestamp ASC",
521 [conversation_id],
522 )
523 .await
524 .map_err(|e| AppError::Database(format!("Failed to query messages: {}", e)))?;
525
526 let mut messages = Vec::new();
527 while let Some(row) = rows
528 .next()
529 .await
530 .map_err(|e| AppError::Database(e.to_string()))?
531 {
532 let role_str: String = row.get(0).map_err(|e| AppError::Database(e.to_string()))?;
533 let role = match role_str.as_str() {
534 "system" => MessageRole::System,
535 "user" => MessageRole::User,
536 "assistant" => MessageRole::Assistant,
537 _ => MessageRole::User,
538 };
539
540 messages.push(Message {
541 role,
542 content: row.get(1).map_err(|e| AppError::Database(e.to_string()))?,
543 timestamp: chrono::DateTime::from_timestamp(
544 row.get::<i64>(2)
545 .map_err(|e| AppError::Database(e.to_string()))?,
546 0,
547 )
548 .unwrap(),
549 });
550 }
551
552 Ok(messages)
553 }
554
555 pub async fn get_conversation(&self, conversation_id: &str) -> Result<Conversation> {
557 let conn = self.operation_conn().await?;
558
559 let mut rows = conn
560 .query(
561 "SELECT id, user_id, title, created_at, updated_at FROM conversations WHERE id = ?",
562 [conversation_id],
563 )
564 .await
565 .map_err(|e| AppError::Database(format!("Failed to query conversation: {}", e)))?;
566
567 if let Some(row) = rows
568 .next()
569 .await
570 .map_err(|e| AppError::Database(e.to_string()))?
571 {
572 let created_ts: i64 = row.get(3).map_err(|e| AppError::Database(e.to_string()))?;
573 let updated_ts: i64 = row.get(4).map_err(|e| AppError::Database(e.to_string()))?;
574
575 Ok(Conversation {
576 id: row.get(0).map_err(|e| AppError::Database(e.to_string()))?,
577 user_id: row.get(1).map_err(|e| AppError::Database(e.to_string()))?,
578 title: row.get(2).map_err(|e| AppError::Database(e.to_string()))?,
579 message_count: 0, created_at: chrono::DateTime::from_timestamp(created_ts, 0)
581 .map(|dt| dt.to_rfc3339())
582 .unwrap_or_default(),
583 updated_at: chrono::DateTime::from_timestamp(updated_ts, 0)
584 .map(|dt| dt.to_rfc3339())
585 .unwrap_or_default(),
586 })
587 } else {
588 Err(AppError::NotFound(format!(
589 "Conversation {} not found",
590 conversation_id
591 )))
592 }
593 }
594
595 pub async fn get_user_conversations(&self, user_id: &str) -> Result<Vec<Conversation>> {
597 let conn = self.operation_conn().await?;
598
599 let mut rows = conn
600 .query(
601 "SELECT c.id, c.user_id, c.title, c.created_at, c.updated_at,
602 (SELECT COUNT(*) FROM messages m WHERE m.conversation_id = c.id) as msg_count
603 FROM conversations c
604 WHERE c.user_id = ?
605 ORDER BY c.updated_at DESC",
606 [user_id],
607 )
608 .await
609 .map_err(|e| AppError::Database(format!("Failed to query conversations: {}", e)))?;
610
611 let mut conversations = Vec::new();
612 while let Some(row) = rows
613 .next()
614 .await
615 .map_err(|e| AppError::Database(e.to_string()))?
616 {
617 let created_ts: i64 = row.get(3).map_err(|e| AppError::Database(e.to_string()))?;
618 let updated_ts: i64 = row.get(4).map_err(|e| AppError::Database(e.to_string()))?;
619
620 conversations.push(Conversation {
621 id: row.get(0).map_err(|e| AppError::Database(e.to_string()))?,
622 user_id: row.get(1).map_err(|e| AppError::Database(e.to_string()))?,
623 title: row.get(2).map_err(|e| AppError::Database(e.to_string()))?,
624 message_count: row.get::<i32>(5).unwrap_or(0),
625 created_at: chrono::DateTime::from_timestamp(created_ts, 0)
626 .map(|dt| dt.to_rfc3339())
627 .unwrap_or_default(),
628 updated_at: chrono::DateTime::from_timestamp(updated_ts, 0)
629 .map(|dt| dt.to_rfc3339())
630 .unwrap_or_default(),
631 });
632 }
633
634 Ok(conversations)
635 }
636
637 pub async fn update_conversation_title(
639 &self,
640 conversation_id: &str,
641 title: Option<&str>,
642 ) -> Result<()> {
643 let conn = self.operation_conn().await?;
644 let now = Utc::now().timestamp();
645
646 conn.execute(
647 "UPDATE conversations SET title = ?, updated_at = ? WHERE id = ?",
648 (title, now, conversation_id),
649 )
650 .await
651 .map_err(|e| AppError::Database(format!("Failed to update conversation: {}", e)))?;
652
653 Ok(())
654 }
655
656 pub async fn delete_conversation(&self, conversation_id: &str) -> Result<()> {
658 let conn = self.operation_conn().await?;
659
660 conn.execute(
662 "DELETE FROM messages WHERE conversation_id = ?",
663 [conversation_id],
664 )
665 .await
666 .map_err(|e| AppError::Database(format!("Failed to delete messages: {}", e)))?;
667
668 conn.execute("DELETE FROM conversations WHERE id = ?", [conversation_id])
670 .await
671 .map_err(|e| AppError::Database(format!("Failed to delete conversation: {}", e)))?;
672
673 Ok(())
674 }
675
676 pub async fn store_memory_fact(&self, fact: &MemoryFact) -> Result<()> {
678 let conn = self.operation_conn().await?;
679
680 conn.execute(
681 "INSERT OR REPLACE INTO memory_facts
682 (id, user_id, category, fact_key, fact_value, confidence, created_at, updated_at)
683 VALUES (?, ?, ?, ?, ?, ?, ?, ?)",
684 (
685 fact.id.as_str(),
686 fact.user_id.as_str(),
687 fact.category.as_str(),
688 fact.fact_key.as_str(),
689 fact.fact_value.as_str(),
690 fact.confidence as f64,
691 fact.created_at.timestamp(),
692 fact.updated_at.timestamp(),
693 ),
694 )
695 .await
696 .map_err(|e| AppError::Database(format!("Failed to store memory fact: {}", e)))?;
697
698 Ok(())
699 }
700
701 pub async fn get_user_memory(&self, user_id: &str) -> Result<Vec<MemoryFact>> {
703 let conn = self.operation_conn().await?;
704
705 let mut rows = conn
706 .query(
707 "SELECT id, user_id, category, fact_key, fact_value, confidence, created_at, updated_at
708 FROM memory_facts WHERE user_id = ?",
709 [user_id],
710 )
711 .await
712 .map_err(|e| AppError::Database(format!("Failed to query memory facts: {}", e)))?;
713
714 let mut facts = Vec::new();
715 while let Some(row) = rows
716 .next()
717 .await
718 .map_err(|e| AppError::Database(e.to_string()))?
719 {
720 facts.push(MemoryFact {
721 id: row.get(0).map_err(|e| AppError::Database(e.to_string()))?,
722 user_id: row.get(1).map_err(|e| AppError::Database(e.to_string()))?,
723 category: row.get(2).map_err(|e| AppError::Database(e.to_string()))?,
724 fact_key: row.get(3).map_err(|e| AppError::Database(e.to_string()))?,
725 fact_value: row.get(4).map_err(|e| AppError::Database(e.to_string()))?,
726 confidence: row
727 .get::<f64>(5)
728 .map_err(|e| AppError::Database(e.to_string()))?
729 as f32,
730 created_at: chrono::DateTime::from_timestamp(
731 row.get::<i64>(6)
732 .map_err(|e| AppError::Database(e.to_string()))?,
733 0,
734 )
735 .unwrap(),
736 updated_at: chrono::DateTime::from_timestamp(
737 row.get::<i64>(7)
738 .map_err(|e| AppError::Database(e.to_string()))?,
739 0,
740 )
741 .unwrap(),
742 });
743 }
744
745 Ok(facts)
746 }
747
748 pub async fn store_preference(&self, user_id: &str, preference: &Preference) -> Result<()> {
750 let conn = self.operation_conn().await?;
751 let now = Utc::now().timestamp();
752 let id = uuid::Uuid::new_v4().to_string();
753
754 conn.execute(
755 "INSERT OR REPLACE INTO preferences
756 (id, user_id, category, key, value, confidence, created_at)
757 VALUES (?, ?, ?, ?, ?, ?, ?)",
758 (
759 id,
760 user_id,
761 preference.category.as_str(),
762 preference.key.as_str(),
763 preference.value.as_str(),
764 preference.confidence as f64,
765 now,
766 ),
767 )
768 .await
769 .map_err(|e| AppError::Database(format!("Failed to store preference: {}", e)))?;
770
771 Ok(())
772 }
773
774 pub async fn get_user_preferences(&self, user_id: &str) -> Result<Vec<Preference>> {
776 let conn = self.operation_conn().await?;
777
778 let mut rows = conn
779 .query(
780 "SELECT category, key, value, confidence FROM preferences WHERE user_id = ?",
781 [user_id],
782 )
783 .await
784 .map_err(|e| AppError::Database(format!("Failed to query preferences: {}", e)))?;
785
786 let mut preferences = Vec::new();
787 while let Some(row) = rows
788 .next()
789 .await
790 .map_err(|e| AppError::Database(e.to_string()))?
791 {
792 preferences.push(Preference {
793 category: row.get(0).map_err(|e| AppError::Database(e.to_string()))?,
794 key: row.get(1).map_err(|e| AppError::Database(e.to_string()))?,
795 value: row.get(2).map_err(|e| AppError::Database(e.to_string()))?,
796 confidence: row
797 .get::<f64>(3)
798 .map_err(|e| AppError::Database(e.to_string()))?
799 as f32,
800 });
801 }
802
803 Ok(preferences)
804 }
805
806 pub async fn create_user_agent(&self, agent: &UserAgent) -> Result<()> {
810 let conn = self.operation_conn().await?;
811
812 let display_name = agent.display_name.as_deref();
814 let description = agent.description.as_deref();
815 let system_prompt = agent.system_prompt.as_deref();
816
817 conn.execute(
818 "INSERT INTO user_agents (
819 id, user_id, name, display_name, description, model, system_prompt,
820 tools, max_tool_iterations, parallel_tools, extra, is_public,
821 usage_count, rating_sum, rating_count, created_at, updated_at
822 ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13, ?14, ?15, ?16, ?17)",
823 params![
824 agent.id.as_str(),
825 agent.user_id.as_str(),
826 agent.name.as_str(),
827 display_name,
828 description,
829 agent.model.as_str(),
830 system_prompt,
831 agent.tools.as_str(),
832 agent.max_tool_iterations,
833 agent.parallel_tools as i32,
834 agent.extra.as_str(),
835 agent.is_public as i32,
836 agent.usage_count,
837 agent.rating_sum,
838 agent.rating_count,
839 agent.created_at,
840 agent.updated_at,
841 ],
842 )
843 .await
844 .map_err(|e| AppError::Database(format!("Failed to create user agent: {}", e)))?;
845
846 Ok(())
847 }
848
849 pub async fn get_user_agent(&self, id: &str) -> Result<Option<UserAgent>> {
851 let conn = self.operation_conn().await?;
852
853 let mut rows = conn
854 .query(
855 "SELECT id, user_id, name, display_name, description, model, system_prompt,
856 tools, max_tool_iterations, parallel_tools, extra, is_public,
857 usage_count, rating_sum, rating_count, created_at, updated_at
858 FROM user_agents WHERE id = ?",
859 [id],
860 )
861 .await
862 .map_err(|e| AppError::Database(format!("Failed to query user agent: {}", e)))?;
863
864 if let Some(row) = rows
865 .next()
866 .await
867 .map_err(|e| AppError::Database(e.to_string()))?
868 {
869 Ok(Some(Self::row_to_user_agent(&row)?))
870 } else {
871 Ok(None)
872 }
873 }
874
875 pub async fn get_user_agent_by_name(
877 &self,
878 user_id: &str,
879 name: &str,
880 ) -> Result<Option<UserAgent>> {
881 let conn = self.operation_conn().await?;
882
883 let mut rows = conn
884 .query(
885 "SELECT id, user_id, name, display_name, description, model, system_prompt,
886 tools, max_tool_iterations, parallel_tools, extra, is_public,
887 usage_count, rating_sum, rating_count, created_at, updated_at
888 FROM user_agents WHERE user_id = ? AND name = ?",
889 (user_id, name),
890 )
891 .await
892 .map_err(|e| AppError::Database(format!("Failed to query user agent: {}", e)))?;
893
894 if let Some(row) = rows
895 .next()
896 .await
897 .map_err(|e| AppError::Database(e.to_string()))?
898 {
899 Ok(Some(Self::row_to_user_agent(&row)?))
900 } else {
901 Ok(None)
902 }
903 }
904
905 pub async fn get_public_agent_by_name(&self, name: &str) -> Result<Option<UserAgent>> {
907 let conn = self.operation_conn().await?;
908
909 let mut rows = conn
910 .query(
911 "SELECT id, user_id, name, display_name, description, model, system_prompt,
912 tools, max_tool_iterations, parallel_tools, extra, is_public,
913 usage_count, rating_sum, rating_count, created_at, updated_at
914 FROM user_agents WHERE name = ? AND is_public = 1
915 ORDER BY usage_count DESC LIMIT 1",
916 [name],
917 )
918 .await
919 .map_err(|e| AppError::Database(format!("Failed to query public agent: {}", e)))?;
920
921 if let Some(row) = rows
922 .next()
923 .await
924 .map_err(|e| AppError::Database(e.to_string()))?
925 {
926 Ok(Some(Self::row_to_user_agent(&row)?))
927 } else {
928 Ok(None)
929 }
930 }
931
932 pub async fn list_user_agents(&self, user_id: &str) -> Result<Vec<UserAgent>> {
934 let conn = self.operation_conn().await?;
935
936 let mut rows = conn
937 .query(
938 "SELECT id, user_id, name, display_name, description, model, system_prompt,
939 tools, max_tool_iterations, parallel_tools, extra, is_public,
940 usage_count, rating_sum, rating_count, created_at, updated_at
941 FROM user_agents WHERE user_id = ? ORDER BY updated_at DESC",
942 [user_id],
943 )
944 .await
945 .map_err(|e| AppError::Database(format!("Failed to list user agents: {}", e)))?;
946
947 let mut agents = Vec::new();
948 while let Some(row) = rows
949 .next()
950 .await
951 .map_err(|e| AppError::Database(e.to_string()))?
952 {
953 agents.push(Self::row_to_user_agent(&row)?);
954 }
955
956 Ok(agents)
957 }
958
959 pub async fn list_public_agents(&self, limit: u32, offset: u32) -> Result<Vec<UserAgent>> {
961 let conn = self.operation_conn().await?;
962
963 let mut rows = conn
964 .query(
965 "SELECT id, user_id, name, display_name, description, model, system_prompt,
966 tools, max_tool_iterations, parallel_tools, extra, is_public,
967 usage_count, rating_sum, rating_count, created_at, updated_at
968 FROM user_agents WHERE is_public = 1
969 ORDER BY usage_count DESC LIMIT ? OFFSET ?",
970 (limit, offset),
971 )
972 .await
973 .map_err(|e| AppError::Database(format!("Failed to list public agents: {}", e)))?;
974
975 let mut agents = Vec::new();
976 while let Some(row) = rows
977 .next()
978 .await
979 .map_err(|e| AppError::Database(e.to_string()))?
980 {
981 agents.push(Self::row_to_user_agent(&row)?);
982 }
983
984 Ok(agents)
985 }
986
987 pub async fn update_user_agent(&self, agent: &UserAgent) -> Result<()> {
989 let conn = self.operation_conn().await?;
990
991 let display_name = agent.display_name.as_deref();
993 let description = agent.description.as_deref();
994 let system_prompt = agent.system_prompt.as_deref();
995
996 conn.execute(
997 "UPDATE user_agents SET
998 display_name = ?1, description = ?2, model = ?3, system_prompt = ?4,
999 tools = ?5, max_tool_iterations = ?6, parallel_tools = ?7, extra = ?8,
1000 is_public = ?9, updated_at = ?10
1001 WHERE id = ?11 AND user_id = ?12",
1002 params![
1003 display_name,
1004 description,
1005 agent.model.as_str(),
1006 system_prompt,
1007 agent.tools.as_str(),
1008 agent.max_tool_iterations,
1009 agent.parallel_tools as i32,
1010 agent.extra.as_str(),
1011 agent.is_public as i32,
1012 agent.updated_at,
1013 agent.id.as_str(),
1014 agent.user_id.as_str(),
1015 ],
1016 )
1017 .await
1018 .map_err(|e| AppError::Database(format!("Failed to update user agent: {}", e)))?;
1019
1020 Ok(())
1021 }
1022
1023 pub async fn delete_user_agent(&self, id: &str, user_id: &str) -> Result<bool> {
1025 let conn = self.operation_conn().await?;
1026
1027 let affected = conn
1028 .execute(
1029 "DELETE FROM user_agents WHERE id = ? AND user_id = ?",
1030 (id, user_id),
1031 )
1032 .await
1033 .map_err(|e| AppError::Database(format!("Failed to delete user agent: {}", e)))?;
1034
1035 Ok(affected > 0)
1036 }
1037
1038 pub async fn increment_agent_usage(&self, id: &str) -> Result<()> {
1040 let conn = self.operation_conn().await?;
1041
1042 conn.execute(
1043 "UPDATE user_agents SET usage_count = usage_count + 1 WHERE id = ?",
1044 [id],
1045 )
1046 .await
1047 .map_err(|e| AppError::Database(format!("Failed to increment agent usage: {}", e)))?;
1048
1049 Ok(())
1050 }
1051
1052 fn row_to_user_agent(row: &libsql::Row) -> Result<UserAgent> {
1054 Ok(UserAgent {
1055 id: row.get(0).map_err(|e| AppError::Database(e.to_string()))?,
1056 user_id: row.get(1).map_err(|e| AppError::Database(e.to_string()))?,
1057 name: row.get(2).map_err(|e| AppError::Database(e.to_string()))?,
1058 display_name: row.get(3).map_err(|e| AppError::Database(e.to_string()))?,
1059 description: row.get(4).map_err(|e| AppError::Database(e.to_string()))?,
1060 model: row.get(5).map_err(|e| AppError::Database(e.to_string()))?,
1061 system_prompt: row.get(6).map_err(|e| AppError::Database(e.to_string()))?,
1062 tools: row.get(7).map_err(|e| AppError::Database(e.to_string()))?,
1063 max_tool_iterations: row.get(8).map_err(|e| AppError::Database(e.to_string()))?,
1064 parallel_tools: row
1065 .get::<i32>(9)
1066 .map_err(|e| AppError::Database(e.to_string()))?
1067 != 0,
1068 extra: row.get(10).map_err(|e| AppError::Database(e.to_string()))?,
1069 is_public: row
1070 .get::<i32>(11)
1071 .map_err(|e| AppError::Database(e.to_string()))?
1072 != 0,
1073 usage_count: row.get(12).map_err(|e| AppError::Database(e.to_string()))?,
1074 rating_sum: row.get(13).map_err(|e| AppError::Database(e.to_string()))?,
1075 rating_count: row.get(14).map_err(|e| AppError::Database(e.to_string()))?,
1076 created_at: row.get(15).map_err(|e| AppError::Database(e.to_string()))?,
1077 updated_at: row.get(16).map_err(|e| AppError::Database(e.to_string()))?,
1078 })
1079 }
1080
1081 pub async fn log_agent_execution(&self, execution: &AgentExecution) -> Result<()> {
1085 let conn = self.operation_conn().await?;
1086
1087 let agent_id = execution.agent_id.as_deref();
1089 let output = execution.output.as_deref();
1090 let tool_calls = execution.tool_calls.as_deref();
1091 let error_message = execution.error_message.as_deref();
1092
1093 conn.execute(
1094 "INSERT INTO agent_executions (
1095 id, agent_id, agent_name, user_id, input, output, tool_calls,
1096 tokens_input, tokens_output, duration_ms, status, error_message, created_at
1097 ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13)",
1098 params![
1099 execution.id.as_str(),
1100 agent_id,
1101 execution.agent_name.as_str(),
1102 execution.user_id.as_str(),
1103 execution.input.as_str(),
1104 output,
1105 tool_calls,
1106 execution.tokens_input,
1107 execution.tokens_output,
1108 execution.duration_ms,
1109 execution.status.as_str(),
1110 error_message,
1111 execution.created_at,
1112 ],
1113 )
1114 .await
1115 .map_err(|e| AppError::Database(format!("Failed to log agent execution: {}", e)))?;
1116
1117 Ok(())
1118 }
1119
1120 pub async fn get_user_executions(
1122 &self,
1123 user_id: &str,
1124 limit: u32,
1125 ) -> Result<Vec<AgentExecution>> {
1126 let conn = self.operation_conn().await?;
1127
1128 let mut rows = conn
1129 .query(
1130 "SELECT id, agent_id, agent_name, user_id, input, output, tool_calls,
1131 tokens_input, tokens_output, duration_ms, status, error_message, created_at
1132 FROM agent_executions WHERE user_id = ?
1133 ORDER BY created_at DESC LIMIT ?",
1134 (user_id, limit),
1135 )
1136 .await
1137 .map_err(|e| AppError::Database(format!("Failed to query executions: {}", e)))?;
1138
1139 let mut executions = Vec::new();
1140 while let Some(row) = rows
1141 .next()
1142 .await
1143 .map_err(|e| AppError::Database(e.to_string()))?
1144 {
1145 executions.push(AgentExecution {
1146 id: row.get(0).map_err(|e| AppError::Database(e.to_string()))?,
1147 agent_id: row.get(1).map_err(|e| AppError::Database(e.to_string()))?,
1148 agent_name: row.get(2).map_err(|e| AppError::Database(e.to_string()))?,
1149 user_id: row.get(3).map_err(|e| AppError::Database(e.to_string()))?,
1150 input: row.get(4).map_err(|e| AppError::Database(e.to_string()))?,
1151 output: row.get(5).map_err(|e| AppError::Database(e.to_string()))?,
1152 tool_calls: row.get(6).map_err(|e| AppError::Database(e.to_string()))?,
1153 tokens_input: row.get(7).map_err(|e| AppError::Database(e.to_string()))?,
1154 tokens_output: row.get(8).map_err(|e| AppError::Database(e.to_string()))?,
1155 duration_ms: row.get(9).map_err(|e| AppError::Database(e.to_string()))?,
1156 status: row.get(10).map_err(|e| AppError::Database(e.to_string()))?,
1157 error_message: row.get(11).map_err(|e| AppError::Database(e.to_string()))?,
1158 created_at: row.get(12).map_err(|e| AppError::Database(e.to_string()))?,
1159 });
1160 }
1161
1162 Ok(executions)
1163 }
1164}
1165
1166#[derive(Debug, Clone)]
1168pub struct User {
1169 pub id: String,
1171 pub email: String,
1173 pub password_hash: String,
1175 pub name: String,
1177 pub created_at: i64,
1179 pub updated_at: i64,
1181}
1182
1183#[derive(Debug, Clone)]
1186pub struct UserAgent {
1187 pub id: String,
1189 pub user_id: String,
1191 pub name: String,
1193 pub display_name: Option<String>,
1195 pub description: Option<String>,
1197 pub model: String,
1199 pub system_prompt: Option<String>,
1201 pub tools: String,
1203 pub max_tool_iterations: i32,
1205 pub parallel_tools: bool,
1207 pub extra: String,
1209 pub is_public: bool,
1211 pub usage_count: i32,
1213 pub rating_sum: i32,
1215 pub rating_count: i32,
1217 pub created_at: i64,
1219 pub updated_at: i64,
1221}
1222
1223impl UserAgent {
1224 pub fn new(id: String, user_id: String, name: String, model: String) -> Self {
1226 let now = Utc::now().timestamp();
1227 Self {
1228 id,
1229 user_id,
1230 name,
1231 display_name: None,
1232 description: None,
1233 model,
1234 system_prompt: None,
1235 tools: "[]".to_string(),
1236 max_tool_iterations: 10,
1237 parallel_tools: false,
1238 extra: "{}".to_string(),
1239 is_public: false,
1240 usage_count: 0,
1241 rating_sum: 0,
1242 rating_count: 0,
1243 created_at: now,
1244 updated_at: now,
1245 }
1246 }
1247
1248 pub fn tools_vec(&self) -> Vec<String> {
1250 serde_json::from_str(&self.tools).unwrap_or_default()
1251 }
1252
1253 pub fn set_tools(&mut self, tools: Vec<String>) {
1255 self.tools = serde_json::to_string(&tools).unwrap_or_else(|_| "[]".to_string());
1256 }
1257
1258 pub fn average_rating(&self) -> Option<f32> {
1260 if self.rating_count > 0 {
1261 Some(self.rating_sum as f32 / self.rating_count as f32)
1262 } else {
1263 None
1264 }
1265 }
1266}
1267
1268#[derive(Debug, Clone)]
1270pub struct AgentExecution {
1271 pub id: String,
1273 pub agent_id: Option<String>,
1275 pub agent_name: String,
1277 pub user_id: String,
1279 pub input: String,
1281 pub output: Option<String>,
1283 pub tool_calls: Option<String>,
1285 pub tokens_input: Option<i32>,
1287 pub tokens_output: Option<i32>,
1289 pub duration_ms: Option<i32>,
1291 pub status: String,
1293 pub error_message: Option<String>,
1295 pub created_at: i64,
1297}
1298
1299impl AgentExecution {
1300 pub fn new(agent_name: String, user_id: String, input: String) -> Self {
1302 Self {
1303 id: uuid::Uuid::new_v4().to_string(),
1304 agent_id: None,
1305 agent_name,
1306 user_id,
1307 input,
1308 output: None,
1309 tool_calls: None,
1310 tokens_input: None,
1311 tokens_output: None,
1312 duration_ms: None,
1313 status: "pending".to_string(),
1314 error_message: None,
1315 created_at: Utc::now().timestamp(),
1316 }
1317 }
1318
1319 pub fn success(mut self, output: String, duration_ms: i32) -> Self {
1321 self.output = Some(output);
1322 self.duration_ms = Some(duration_ms);
1323 self.status = "success".to_string();
1324 self
1325 }
1326
1327 pub fn error(mut self, error: String) -> Self {
1329 self.error_message = Some(error);
1330 self.status = "error".to_string();
1331 self
1332 }
1333}