1use crate::types::{AppError, MemoryFact, Message, MessageRole, Preference, Result};
2use chrono::Utc;
3use libsql::{params, Builder, Connection, Database};
4use std::sync::Arc;
5use tokio::sync::Mutex;
6
7pub struct TursoClient {
8 db: Database,
9 cached_conn: Arc<Mutex<Option<Connection>>>,
11 is_memory: bool,
12}
13
14impl TursoClient {
15 pub async fn new_remote(url: String, auth_token: String) -> Result<Self> {
17 let db = Builder::new_remote(url, auth_token)
18 .build()
19 .await
20 .map_err(|e| AppError::Database(format!("Failed to connect to Turso: {}", e)))?;
21
22 let client = Self {
23 db,
24 cached_conn: Arc::new(Mutex::new(None)),
25 is_memory: false,
26 };
27 client.initialize_schema().await?;
28
29 Ok(client)
30 }
31
32 pub async fn new_local(path: &str) -> Result<Self> {
34 let is_memory = path == ":memory:";
35 let db = Builder::new_local(path)
36 .build()
37 .await
38 .map_err(|e| AppError::Database(format!("Failed to open local database: {}", e)))?;
39
40 let client = Self {
41 db,
42 cached_conn: Arc::new(Mutex::new(None)),
43 is_memory,
44 };
45
46 if is_memory {
49 let conn = client
50 .db
51 .connect()
52 .map_err(|e| AppError::Database(format!("Failed to get connection: {}", e)))?;
53 *client.cached_conn.lock().await = Some(conn);
54 }
55
56 client.initialize_schema().await?;
57
58 Ok(client)
59 }
60
61 pub async fn new_memory() -> Result<Self> {
63 Self::new_local(":memory:").await
64 }
65
66 pub async fn new(url: String, auth_token: String) -> Result<Self> {
68 if url.starts_with("file:") || url.ends_with(".db") || url == ":memory:" {
70 Self::new_local(&url).await
71 } else if url.starts_with("libsql://") || url.starts_with("https://") {
72 Self::new_remote(url, auth_token).await
73 } else {
74 Self::new_local(&url).await
76 }
77 }
78
79 pub fn connection(&self) -> Result<Connection> {
80 self.db
81 .connect()
82 .map_err(|e| AppError::Database(format!("Failed to get connection: {}", e)))
83 }
84
85 pub async fn operation_conn(&self) -> Result<Connection> {
87 if self.is_memory {
88 let guard = self.cached_conn.lock().await;
89 guard.as_ref().cloned().ok_or_else(|| {
90 AppError::Database("No cached connection for in-memory database".to_string())
91 })
92 } else {
93 self.connection()
94 }
95 }
96
97 async fn initialize_schema(&self) -> Result<()> {
98 let conn = self.operation_conn().await?;
99
100 conn.execute(
102 "CREATE TABLE IF NOT EXISTS users (
103 id TEXT PRIMARY KEY,
104 email TEXT UNIQUE NOT NULL,
105 password_hash TEXT NOT NULL,
106 name TEXT NOT NULL,
107 created_at INTEGER NOT NULL,
108 updated_at INTEGER NOT NULL
109 )",
110 (),
111 )
112 .await
113 .map_err(|e| AppError::Database(format!("Failed to create users table: {}", e)))?;
114
115 conn.execute(
117 "CREATE TABLE IF NOT EXISTS sessions (
118 id TEXT PRIMARY KEY,
119 user_id TEXT NOT NULL,
120 token_hash TEXT NOT NULL,
121 expires_at INTEGER NOT NULL,
122 created_at INTEGER NOT NULL,
123 FOREIGN KEY (user_id) REFERENCES users(id)
124 )",
125 (),
126 )
127 .await
128 .map_err(|e| AppError::Database(format!("Failed to create sessions table: {}", e)))?;
129
130 conn.execute(
132 "CREATE TABLE IF NOT EXISTS conversations (
133 id TEXT PRIMARY KEY,
134 user_id TEXT NOT NULL,
135 title TEXT,
136 created_at INTEGER NOT NULL,
137 updated_at INTEGER NOT NULL,
138 FOREIGN KEY (user_id) REFERENCES users(id)
139 )",
140 (),
141 )
142 .await
143 .map_err(|e| AppError::Database(format!("Failed to create conversations table: {}", e)))?;
144
145 conn.execute(
147 "CREATE TABLE IF NOT EXISTS messages (
148 id TEXT PRIMARY KEY,
149 conversation_id TEXT NOT NULL,
150 role TEXT NOT NULL,
151 content TEXT NOT NULL,
152 timestamp INTEGER NOT NULL,
153 FOREIGN KEY (conversation_id) REFERENCES conversations(id)
154 )",
155 (),
156 )
157 .await
158 .map_err(|e| AppError::Database(format!("Failed to create messages table: {}", e)))?;
159
160 conn.execute(
162 "CREATE TABLE IF NOT EXISTS memory_facts (
163 id TEXT PRIMARY KEY,
164 user_id TEXT NOT NULL,
165 category TEXT NOT NULL,
166 fact_key TEXT NOT NULL,
167 fact_value TEXT NOT NULL,
168 confidence REAL NOT NULL,
169 created_at INTEGER NOT NULL,
170 updated_at INTEGER NOT NULL,
171 FOREIGN KEY (user_id) REFERENCES users(id)
172 )",
173 (),
174 )
175 .await
176 .map_err(|e| AppError::Database(format!("Failed to create memory_facts table: {}", e)))?;
177
178 conn.execute(
180 "CREATE TABLE IF NOT EXISTS preferences (
181 id TEXT PRIMARY KEY,
182 user_id TEXT NOT NULL,
183 category TEXT NOT NULL,
184 key TEXT NOT NULL,
185 value TEXT NOT NULL,
186 confidence REAL NOT NULL,
187 created_at INTEGER NOT NULL,
188 FOREIGN KEY (user_id) REFERENCES users(id),
189 UNIQUE(user_id, category, key)
190 )",
191 (),
192 )
193 .await
194 .map_err(|e| AppError::Database(format!("Failed to create preferences table: {}", e)))?;
195
196 conn.execute(
198 "CREATE TABLE IF NOT EXISTS user_agents (
199 id TEXT PRIMARY KEY,
200 user_id TEXT NOT NULL,
201 name TEXT NOT NULL,
202 display_name TEXT,
203 description TEXT,
204 model TEXT NOT NULL,
205 system_prompt TEXT,
206 tools TEXT DEFAULT '[]',
207 max_tool_iterations INTEGER DEFAULT 10,
208 parallel_tools INTEGER DEFAULT 0,
209 extra TEXT DEFAULT '{}',
210 is_public INTEGER DEFAULT 0,
211 usage_count INTEGER DEFAULT 0,
212 rating_sum INTEGER DEFAULT 0,
213 rating_count INTEGER DEFAULT 0,
214 created_at INTEGER NOT NULL,
215 updated_at INTEGER NOT NULL,
216 FOREIGN KEY (user_id) REFERENCES users(id),
217 UNIQUE(user_id, name)
218 )",
219 (),
220 )
221 .await
222 .map_err(|e| AppError::Database(format!("Failed to create user_agents table: {}", e)))?;
223
224 conn.execute(
226 "CREATE INDEX IF NOT EXISTS idx_user_agents_lookup ON user_agents(user_id, name)",
227 (),
228 )
229 .await
230 .map_err(|e| {
231 AppError::Database(format!("Failed to create user_agents_lookup index: {}", e))
232 })?;
233
234 conn.execute(
236 "CREATE INDEX IF NOT EXISTS idx_user_agents_public ON user_agents(is_public, usage_count DESC)",
237 (),
238 )
239 .await
240 .map_err(|e| {
241 AppError::Database(format!("Failed to create user_agents_public index: {}", e))
242 })?;
243
244 conn.execute(
246 "CREATE TABLE IF NOT EXISTS user_tools (
247 id TEXT PRIMARY KEY,
248 user_id TEXT NOT NULL,
249 name TEXT NOT NULL,
250 display_name TEXT,
251 description TEXT,
252 enabled INTEGER DEFAULT 1,
253 timeout_secs INTEGER DEFAULT 30,
254 tool_type TEXT NOT NULL,
255 config TEXT DEFAULT '{}',
256 parameters TEXT DEFAULT '{}',
257 extra TEXT DEFAULT '{}',
258 is_public INTEGER DEFAULT 0,
259 usage_count INTEGER DEFAULT 0,
260 created_at INTEGER NOT NULL,
261 updated_at INTEGER NOT NULL,
262 FOREIGN KEY (user_id) REFERENCES users(id),
263 UNIQUE(user_id, name)
264 )",
265 (),
266 )
267 .await
268 .map_err(|e| AppError::Database(format!("Failed to create user_tools table: {}", e)))?;
269
270 conn.execute(
272 "CREATE TABLE IF NOT EXISTS user_mcps (
273 id TEXT PRIMARY KEY,
274 user_id TEXT NOT NULL,
275 name TEXT NOT NULL,
276 enabled INTEGER DEFAULT 1,
277 command TEXT NOT NULL,
278 args TEXT DEFAULT '[]',
279 env TEXT DEFAULT '{}',
280 timeout_secs INTEGER DEFAULT 30,
281 is_public INTEGER DEFAULT 0,
282 created_at INTEGER NOT NULL,
283 updated_at INTEGER NOT NULL,
284 FOREIGN KEY (user_id) REFERENCES users(id),
285 UNIQUE(user_id, name)
286 )",
287 (),
288 )
289 .await
290 .map_err(|e| AppError::Database(format!("Failed to create user_mcps table: {}", e)))?;
291
292 conn.execute(
294 "CREATE TABLE IF NOT EXISTS agent_executions (
295 id TEXT PRIMARY KEY,
296 agent_id TEXT,
297 agent_name TEXT NOT NULL,
298 user_id TEXT NOT NULL,
299 input TEXT NOT NULL,
300 output TEXT,
301 tool_calls TEXT,
302 tokens_input INTEGER,
303 tokens_output INTEGER,
304 duration_ms INTEGER,
305 status TEXT NOT NULL,
306 error_message TEXT,
307 created_at INTEGER NOT NULL,
308 FOREIGN KEY (user_id) REFERENCES users(id)
309 )",
310 (),
311 )
312 .await
313 .map_err(|e| {
314 AppError::Database(format!("Failed to create agent_executions table: {}", e))
315 })?;
316
317 conn.execute(
319 "CREATE INDEX IF NOT EXISTS idx_executions_user ON agent_executions(user_id, created_at DESC)",
320 (),
321 )
322 .await
323 .map_err(|e| AppError::Database(format!("Failed to create executions_user index: {}", e)))?;
324
325 conn.execute(
326 "CREATE INDEX IF NOT EXISTS idx_executions_agent ON agent_executions(agent_name, created_at DESC)",
327 (),
328 )
329 .await
330 .map_err(|e| {
331 AppError::Database(format!("Failed to create executions_agent index: {}", e))
332 })?;
333
334 Ok(())
335 }
336
337 pub async fn create_user(
339 &self,
340 id: &str,
341 email: &str,
342 password_hash: &str,
343 name: &str,
344 ) -> Result<()> {
345 let conn = self.operation_conn().await?;
346 let now = Utc::now().timestamp();
347
348 conn.execute(
349 "INSERT INTO users (id, email, password_hash, name, created_at, updated_at)
350 VALUES (?, ?, ?, ?, ?, ?)",
351 (id, email, password_hash, name, now, now),
352 )
353 .await
354 .map_err(|e| AppError::Database(format!("Failed to create user: {}", e)))?;
355
356 Ok(())
357 }
358
359 pub async fn get_user_by_email(&self, email: &str) -> Result<Option<User>> {
360 let conn = self.operation_conn().await?;
361
362 let mut rows = conn
363 .query(
364 "SELECT id, email, password_hash, name, created_at, updated_at
365 FROM users WHERE email = ?",
366 [email],
367 )
368 .await
369 .map_err(|e| AppError::Database(format!("Failed to query user: {}", e)))?;
370
371 if let Some(row) = rows
372 .next()
373 .await
374 .map_err(|e| AppError::Database(e.to_string()))?
375 {
376 Ok(Some(User {
377 id: row.get(0).map_err(|e| AppError::Database(e.to_string()))?,
378 email: row.get(1).map_err(|e| AppError::Database(e.to_string()))?,
379 password_hash: row.get(2).map_err(|e| AppError::Database(e.to_string()))?,
380 name: row.get(3).map_err(|e| AppError::Database(e.to_string()))?,
381 created_at: row.get(4).map_err(|e| AppError::Database(e.to_string()))?,
382 updated_at: row.get(5).map_err(|e| AppError::Database(e.to_string()))?,
383 }))
384 } else {
385 Ok(None)
386 }
387 }
388
389 pub async fn create_session(
391 &self,
392 id: &str,
393 user_id: &str,
394 token_hash: &str,
395 expires_at: i64,
396 ) -> Result<()> {
397 let conn = self.operation_conn().await?;
398 let now = Utc::now().timestamp();
399
400 conn.execute(
401 "INSERT INTO sessions (id, user_id, token_hash, expires_at, created_at)
402 VALUES (?, ?, ?, ?, ?)",
403 (id, user_id, token_hash, expires_at, now),
404 )
405 .await
406 .map_err(|e| AppError::Database(format!("Failed to create session: {}", e)))?;
407
408 Ok(())
409 }
410
411 pub async fn create_conversation(
413 &self,
414 id: &str,
415 user_id: &str,
416 title: Option<&str>,
417 ) -> Result<()> {
418 let conn = self.operation_conn().await?;
419 let now = Utc::now().timestamp();
420
421 conn.execute(
422 "INSERT INTO conversations (id, user_id, title, created_at, updated_at)
423 VALUES (?, ?, ?, ?, ?)",
424 (id, user_id, title, now, now),
425 )
426 .await
427 .map_err(|e| AppError::Database(format!("Failed to create conversation: {}", e)))?;
428
429 Ok(())
430 }
431
432 pub async fn conversation_exists(&self, conversation_id: &str) -> Result<bool> {
433 let conn = self.operation_conn().await?;
434
435 let mut rows = conn
436 .query(
437 "SELECT 1 FROM conversations WHERE id = ?",
438 [conversation_id],
439 )
440 .await
441 .map_err(|e| AppError::Database(format!("Failed to check conversation: {}", e)))?;
442
443 Ok(rows
444 .next()
445 .await
446 .map_err(|e| AppError::Database(e.to_string()))?
447 .is_some())
448 }
449
450 pub async fn add_message(
451 &self,
452 id: &str,
453 conversation_id: &str,
454 role: MessageRole,
455 content: &str,
456 ) -> Result<()> {
457 let conn = self.operation_conn().await?;
458 let now = Utc::now().timestamp();
459 let role_str = match role {
460 MessageRole::System => "system",
461 MessageRole::User => "user",
462 MessageRole::Assistant => "assistant",
463 };
464
465 conn.execute(
466 "INSERT INTO messages (id, conversation_id, role, content, timestamp)
467 VALUES (?, ?, ?, ?, ?)",
468 (id, conversation_id, role_str, content, now),
469 )
470 .await
471 .map_err(|e| AppError::Database(format!("Failed to add message: {}", e)))?;
472
473 Ok(())
474 }
475
476 pub async fn get_conversation_history(&self, conversation_id: &str) -> Result<Vec<Message>> {
477 let conn = self.operation_conn().await?;
478
479 let mut rows = conn
480 .query(
481 "SELECT role, content, timestamp FROM messages
482 WHERE conversation_id = ? ORDER BY timestamp ASC",
483 [conversation_id],
484 )
485 .await
486 .map_err(|e| AppError::Database(format!("Failed to query messages: {}", e)))?;
487
488 let mut messages = Vec::new();
489 while let Some(row) = rows
490 .next()
491 .await
492 .map_err(|e| AppError::Database(e.to_string()))?
493 {
494 let role_str: String = row.get(0).map_err(|e| AppError::Database(e.to_string()))?;
495 let role = match role_str.as_str() {
496 "system" => MessageRole::System,
497 "user" => MessageRole::User,
498 "assistant" => MessageRole::Assistant,
499 _ => MessageRole::User,
500 };
501
502 messages.push(Message {
503 role,
504 content: row.get(1).map_err(|e| AppError::Database(e.to_string()))?,
505 timestamp: chrono::DateTime::from_timestamp(
506 row.get::<i64>(2)
507 .map_err(|e| AppError::Database(e.to_string()))?,
508 0,
509 )
510 .unwrap(),
511 });
512 }
513
514 Ok(messages)
515 }
516
517 pub async fn store_memory_fact(&self, fact: &MemoryFact) -> Result<()> {
519 let conn = self.operation_conn().await?;
520
521 conn.execute(
522 "INSERT OR REPLACE INTO memory_facts
523 (id, user_id, category, fact_key, fact_value, confidence, created_at, updated_at)
524 VALUES (?, ?, ?, ?, ?, ?, ?, ?)",
525 (
526 fact.id.as_str(),
527 fact.user_id.as_str(),
528 fact.category.as_str(),
529 fact.fact_key.as_str(),
530 fact.fact_value.as_str(),
531 fact.confidence as f64,
532 fact.created_at.timestamp(),
533 fact.updated_at.timestamp(),
534 ),
535 )
536 .await
537 .map_err(|e| AppError::Database(format!("Failed to store memory fact: {}", e)))?;
538
539 Ok(())
540 }
541
542 pub async fn get_user_memory(&self, user_id: &str) -> Result<Vec<MemoryFact>> {
543 let conn = self.operation_conn().await?;
544
545 let mut rows = conn
546 .query(
547 "SELECT id, user_id, category, fact_key, fact_value, confidence, created_at, updated_at
548 FROM memory_facts WHERE user_id = ?",
549 [user_id],
550 )
551 .await
552 .map_err(|e| AppError::Database(format!("Failed to query memory facts: {}", e)))?;
553
554 let mut facts = Vec::new();
555 while let Some(row) = rows
556 .next()
557 .await
558 .map_err(|e| AppError::Database(e.to_string()))?
559 {
560 facts.push(MemoryFact {
561 id: row.get(0).map_err(|e| AppError::Database(e.to_string()))?,
562 user_id: row.get(1).map_err(|e| AppError::Database(e.to_string()))?,
563 category: row.get(2).map_err(|e| AppError::Database(e.to_string()))?,
564 fact_key: row.get(3).map_err(|e| AppError::Database(e.to_string()))?,
565 fact_value: row.get(4).map_err(|e| AppError::Database(e.to_string()))?,
566 confidence: row
567 .get::<f64>(5)
568 .map_err(|e| AppError::Database(e.to_string()))?
569 as f32,
570 created_at: chrono::DateTime::from_timestamp(
571 row.get::<i64>(6)
572 .map_err(|e| AppError::Database(e.to_string()))?,
573 0,
574 )
575 .unwrap(),
576 updated_at: chrono::DateTime::from_timestamp(
577 row.get::<i64>(7)
578 .map_err(|e| AppError::Database(e.to_string()))?,
579 0,
580 )
581 .unwrap(),
582 });
583 }
584
585 Ok(facts)
586 }
587
588 pub async fn store_preference(&self, user_id: &str, preference: &Preference) -> Result<()> {
589 let conn = self.operation_conn().await?;
590 let now = Utc::now().timestamp();
591 let id = uuid::Uuid::new_v4().to_string();
592
593 conn.execute(
594 "INSERT OR REPLACE INTO preferences
595 (id, user_id, category, key, value, confidence, created_at)
596 VALUES (?, ?, ?, ?, ?, ?, ?)",
597 (
598 id,
599 user_id,
600 preference.category.as_str(),
601 preference.key.as_str(),
602 preference.value.as_str(),
603 preference.confidence as f64,
604 now,
605 ),
606 )
607 .await
608 .map_err(|e| AppError::Database(format!("Failed to store preference: {}", e)))?;
609
610 Ok(())
611 }
612
613 pub async fn get_user_preferences(&self, user_id: &str) -> Result<Vec<Preference>> {
614 let conn = self.operation_conn().await?;
615
616 let mut rows = conn
617 .query(
618 "SELECT category, key, value, confidence FROM preferences WHERE user_id = ?",
619 [user_id],
620 )
621 .await
622 .map_err(|e| AppError::Database(format!("Failed to query preferences: {}", e)))?;
623
624 let mut preferences = Vec::new();
625 while let Some(row) = rows
626 .next()
627 .await
628 .map_err(|e| AppError::Database(e.to_string()))?
629 {
630 preferences.push(Preference {
631 category: row.get(0).map_err(|e| AppError::Database(e.to_string()))?,
632 key: row.get(1).map_err(|e| AppError::Database(e.to_string()))?,
633 value: row.get(2).map_err(|e| AppError::Database(e.to_string()))?,
634 confidence: row
635 .get::<f64>(3)
636 .map_err(|e| AppError::Database(e.to_string()))?
637 as f32,
638 });
639 }
640
641 Ok(preferences)
642 }
643
644 pub async fn create_user_agent(&self, agent: &UserAgent) -> Result<()> {
648 let conn = self.operation_conn().await?;
649
650 let display_name = agent.display_name.as_deref();
652 let description = agent.description.as_deref();
653 let system_prompt = agent.system_prompt.as_deref();
654
655 conn.execute(
656 "INSERT INTO user_agents (
657 id, user_id, name, display_name, description, model, system_prompt,
658 tools, max_tool_iterations, parallel_tools, extra, is_public,
659 usage_count, rating_sum, rating_count, created_at, updated_at
660 ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13, ?14, ?15, ?16, ?17)",
661 params![
662 agent.id.as_str(),
663 agent.user_id.as_str(),
664 agent.name.as_str(),
665 display_name,
666 description,
667 agent.model.as_str(),
668 system_prompt,
669 agent.tools.as_str(),
670 agent.max_tool_iterations,
671 agent.parallel_tools as i32,
672 agent.extra.as_str(),
673 agent.is_public as i32,
674 agent.usage_count,
675 agent.rating_sum,
676 agent.rating_count,
677 agent.created_at,
678 agent.updated_at,
679 ],
680 )
681 .await
682 .map_err(|e| AppError::Database(format!("Failed to create user agent: {}", e)))?;
683
684 Ok(())
685 }
686
687 pub async fn get_user_agent(&self, id: &str) -> Result<Option<UserAgent>> {
689 let conn = self.operation_conn().await?;
690
691 let mut rows = conn
692 .query(
693 "SELECT id, user_id, name, display_name, description, model, system_prompt,
694 tools, max_tool_iterations, parallel_tools, extra, is_public,
695 usage_count, rating_sum, rating_count, created_at, updated_at
696 FROM user_agents WHERE id = ?",
697 [id],
698 )
699 .await
700 .map_err(|e| AppError::Database(format!("Failed to query user agent: {}", e)))?;
701
702 if let Some(row) = rows
703 .next()
704 .await
705 .map_err(|e| AppError::Database(e.to_string()))?
706 {
707 Ok(Some(Self::row_to_user_agent(&row)?))
708 } else {
709 Ok(None)
710 }
711 }
712
713 pub async fn get_user_agent_by_name(
715 &self,
716 user_id: &str,
717 name: &str,
718 ) -> Result<Option<UserAgent>> {
719 let conn = self.operation_conn().await?;
720
721 let mut rows = conn
722 .query(
723 "SELECT id, user_id, name, display_name, description, model, system_prompt,
724 tools, max_tool_iterations, parallel_tools, extra, is_public,
725 usage_count, rating_sum, rating_count, created_at, updated_at
726 FROM user_agents WHERE user_id = ? AND name = ?",
727 (user_id, name),
728 )
729 .await
730 .map_err(|e| AppError::Database(format!("Failed to query user agent: {}", e)))?;
731
732 if let Some(row) = rows
733 .next()
734 .await
735 .map_err(|e| AppError::Database(e.to_string()))?
736 {
737 Ok(Some(Self::row_to_user_agent(&row)?))
738 } else {
739 Ok(None)
740 }
741 }
742
743 pub async fn get_public_agent_by_name(&self, name: &str) -> Result<Option<UserAgent>> {
745 let conn = self.operation_conn().await?;
746
747 let mut rows = conn
748 .query(
749 "SELECT id, user_id, name, display_name, description, model, system_prompt,
750 tools, max_tool_iterations, parallel_tools, extra, is_public,
751 usage_count, rating_sum, rating_count, created_at, updated_at
752 FROM user_agents WHERE name = ? AND is_public = 1
753 ORDER BY usage_count DESC LIMIT 1",
754 [name],
755 )
756 .await
757 .map_err(|e| AppError::Database(format!("Failed to query public agent: {}", e)))?;
758
759 if let Some(row) = rows
760 .next()
761 .await
762 .map_err(|e| AppError::Database(e.to_string()))?
763 {
764 Ok(Some(Self::row_to_user_agent(&row)?))
765 } else {
766 Ok(None)
767 }
768 }
769
770 pub async fn list_user_agents(&self, user_id: &str) -> Result<Vec<UserAgent>> {
772 let conn = self.operation_conn().await?;
773
774 let mut rows = conn
775 .query(
776 "SELECT id, user_id, name, display_name, description, model, system_prompt,
777 tools, max_tool_iterations, parallel_tools, extra, is_public,
778 usage_count, rating_sum, rating_count, created_at, updated_at
779 FROM user_agents WHERE user_id = ? ORDER BY updated_at DESC",
780 [user_id],
781 )
782 .await
783 .map_err(|e| AppError::Database(format!("Failed to list user agents: {}", e)))?;
784
785 let mut agents = Vec::new();
786 while let Some(row) = rows
787 .next()
788 .await
789 .map_err(|e| AppError::Database(e.to_string()))?
790 {
791 agents.push(Self::row_to_user_agent(&row)?);
792 }
793
794 Ok(agents)
795 }
796
797 pub async fn list_public_agents(&self, limit: u32, offset: u32) -> Result<Vec<UserAgent>> {
799 let conn = self.operation_conn().await?;
800
801 let mut rows = conn
802 .query(
803 "SELECT id, user_id, name, display_name, description, model, system_prompt,
804 tools, max_tool_iterations, parallel_tools, extra, is_public,
805 usage_count, rating_sum, rating_count, created_at, updated_at
806 FROM user_agents WHERE is_public = 1
807 ORDER BY usage_count DESC LIMIT ? OFFSET ?",
808 (limit, offset),
809 )
810 .await
811 .map_err(|e| AppError::Database(format!("Failed to list public agents: {}", e)))?;
812
813 let mut agents = Vec::new();
814 while let Some(row) = rows
815 .next()
816 .await
817 .map_err(|e| AppError::Database(e.to_string()))?
818 {
819 agents.push(Self::row_to_user_agent(&row)?);
820 }
821
822 Ok(agents)
823 }
824
825 pub async fn update_user_agent(&self, agent: &UserAgent) -> Result<()> {
827 let conn = self.operation_conn().await?;
828
829 let display_name = agent.display_name.as_deref();
831 let description = agent.description.as_deref();
832 let system_prompt = agent.system_prompt.as_deref();
833
834 conn.execute(
835 "UPDATE user_agents SET
836 display_name = ?1, description = ?2, model = ?3, system_prompt = ?4,
837 tools = ?5, max_tool_iterations = ?6, parallel_tools = ?7, extra = ?8,
838 is_public = ?9, updated_at = ?10
839 WHERE id = ?11 AND user_id = ?12",
840 params![
841 display_name,
842 description,
843 agent.model.as_str(),
844 system_prompt,
845 agent.tools.as_str(),
846 agent.max_tool_iterations,
847 agent.parallel_tools as i32,
848 agent.extra.as_str(),
849 agent.is_public as i32,
850 agent.updated_at,
851 agent.id.as_str(),
852 agent.user_id.as_str(),
853 ],
854 )
855 .await
856 .map_err(|e| AppError::Database(format!("Failed to update user agent: {}", e)))?;
857
858 Ok(())
859 }
860
861 pub async fn delete_user_agent(&self, id: &str, user_id: &str) -> Result<bool> {
863 let conn = self.operation_conn().await?;
864
865 let affected = conn
866 .execute(
867 "DELETE FROM user_agents WHERE id = ? AND user_id = ?",
868 (id, user_id),
869 )
870 .await
871 .map_err(|e| AppError::Database(format!("Failed to delete user agent: {}", e)))?;
872
873 Ok(affected > 0)
874 }
875
876 pub async fn increment_agent_usage(&self, id: &str) -> Result<()> {
878 let conn = self.operation_conn().await?;
879
880 conn.execute(
881 "UPDATE user_agents SET usage_count = usage_count + 1 WHERE id = ?",
882 [id],
883 )
884 .await
885 .map_err(|e| AppError::Database(format!("Failed to increment agent usage: {}", e)))?;
886
887 Ok(())
888 }
889
890 fn row_to_user_agent(row: &libsql::Row) -> Result<UserAgent> {
892 Ok(UserAgent {
893 id: row.get(0).map_err(|e| AppError::Database(e.to_string()))?,
894 user_id: row.get(1).map_err(|e| AppError::Database(e.to_string()))?,
895 name: row.get(2).map_err(|e| AppError::Database(e.to_string()))?,
896 display_name: row.get(3).map_err(|e| AppError::Database(e.to_string()))?,
897 description: row.get(4).map_err(|e| AppError::Database(e.to_string()))?,
898 model: row.get(5).map_err(|e| AppError::Database(e.to_string()))?,
899 system_prompt: row.get(6).map_err(|e| AppError::Database(e.to_string()))?,
900 tools: row.get(7).map_err(|e| AppError::Database(e.to_string()))?,
901 max_tool_iterations: row.get(8).map_err(|e| AppError::Database(e.to_string()))?,
902 parallel_tools: row
903 .get::<i32>(9)
904 .map_err(|e| AppError::Database(e.to_string()))?
905 != 0,
906 extra: row.get(10).map_err(|e| AppError::Database(e.to_string()))?,
907 is_public: row
908 .get::<i32>(11)
909 .map_err(|e| AppError::Database(e.to_string()))?
910 != 0,
911 usage_count: row.get(12).map_err(|e| AppError::Database(e.to_string()))?,
912 rating_sum: row.get(13).map_err(|e| AppError::Database(e.to_string()))?,
913 rating_count: row.get(14).map_err(|e| AppError::Database(e.to_string()))?,
914 created_at: row.get(15).map_err(|e| AppError::Database(e.to_string()))?,
915 updated_at: row.get(16).map_err(|e| AppError::Database(e.to_string()))?,
916 })
917 }
918
919 pub async fn log_agent_execution(&self, execution: &AgentExecution) -> Result<()> {
923 let conn = self.operation_conn().await?;
924
925 let agent_id = execution.agent_id.as_deref();
927 let output = execution.output.as_deref();
928 let tool_calls = execution.tool_calls.as_deref();
929 let error_message = execution.error_message.as_deref();
930
931 conn.execute(
932 "INSERT INTO agent_executions (
933 id, agent_id, agent_name, user_id, input, output, tool_calls,
934 tokens_input, tokens_output, duration_ms, status, error_message, created_at
935 ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13)",
936 params![
937 execution.id.as_str(),
938 agent_id,
939 execution.agent_name.as_str(),
940 execution.user_id.as_str(),
941 execution.input.as_str(),
942 output,
943 tool_calls,
944 execution.tokens_input,
945 execution.tokens_output,
946 execution.duration_ms,
947 execution.status.as_str(),
948 error_message,
949 execution.created_at,
950 ],
951 )
952 .await
953 .map_err(|e| AppError::Database(format!("Failed to log agent execution: {}", e)))?;
954
955 Ok(())
956 }
957
958 pub async fn get_user_executions(
960 &self,
961 user_id: &str,
962 limit: u32,
963 ) -> Result<Vec<AgentExecution>> {
964 let conn = self.operation_conn().await?;
965
966 let mut rows = conn
967 .query(
968 "SELECT id, agent_id, agent_name, user_id, input, output, tool_calls,
969 tokens_input, tokens_output, duration_ms, status, error_message, created_at
970 FROM agent_executions WHERE user_id = ?
971 ORDER BY created_at DESC LIMIT ?",
972 (user_id, limit),
973 )
974 .await
975 .map_err(|e| AppError::Database(format!("Failed to query executions: {}", e)))?;
976
977 let mut executions = Vec::new();
978 while let Some(row) = rows
979 .next()
980 .await
981 .map_err(|e| AppError::Database(e.to_string()))?
982 {
983 executions.push(AgentExecution {
984 id: row.get(0).map_err(|e| AppError::Database(e.to_string()))?,
985 agent_id: row.get(1).map_err(|e| AppError::Database(e.to_string()))?,
986 agent_name: row.get(2).map_err(|e| AppError::Database(e.to_string()))?,
987 user_id: row.get(3).map_err(|e| AppError::Database(e.to_string()))?,
988 input: row.get(4).map_err(|e| AppError::Database(e.to_string()))?,
989 output: row.get(5).map_err(|e| AppError::Database(e.to_string()))?,
990 tool_calls: row.get(6).map_err(|e| AppError::Database(e.to_string()))?,
991 tokens_input: row.get(7).map_err(|e| AppError::Database(e.to_string()))?,
992 tokens_output: row.get(8).map_err(|e| AppError::Database(e.to_string()))?,
993 duration_ms: row.get(9).map_err(|e| AppError::Database(e.to_string()))?,
994 status: row.get(10).map_err(|e| AppError::Database(e.to_string()))?,
995 error_message: row.get(11).map_err(|e| AppError::Database(e.to_string()))?,
996 created_at: row.get(12).map_err(|e| AppError::Database(e.to_string()))?,
997 });
998 }
999
1000 Ok(executions)
1001 }
1002}
1003
1004#[derive(Debug, Clone)]
1005pub struct User {
1006 pub id: String,
1007 pub email: String,
1008 pub password_hash: String,
1009 pub name: String,
1010 pub created_at: i64,
1011 pub updated_at: i64,
1012}
1013
1014#[derive(Debug, Clone)]
1017pub struct UserAgent {
1018 pub id: String,
1019 pub user_id: String,
1020 pub name: String,
1021 pub display_name: Option<String>,
1022 pub description: Option<String>,
1023 pub model: String,
1024 pub system_prompt: Option<String>,
1025 pub tools: String,
1027 pub max_tool_iterations: i32,
1028 pub parallel_tools: bool,
1029 pub extra: String,
1031 pub is_public: bool,
1032 pub usage_count: i32,
1033 pub rating_sum: i32,
1034 pub rating_count: i32,
1035 pub created_at: i64,
1036 pub updated_at: i64,
1037}
1038
1039impl UserAgent {
1040 pub fn new(id: String, user_id: String, name: String, model: String) -> Self {
1042 let now = Utc::now().timestamp();
1043 Self {
1044 id,
1045 user_id,
1046 name,
1047 display_name: None,
1048 description: None,
1049 model,
1050 system_prompt: None,
1051 tools: "[]".to_string(),
1052 max_tool_iterations: 10,
1053 parallel_tools: false,
1054 extra: "{}".to_string(),
1055 is_public: false,
1056 usage_count: 0,
1057 rating_sum: 0,
1058 rating_count: 0,
1059 created_at: now,
1060 updated_at: now,
1061 }
1062 }
1063
1064 pub fn tools_vec(&self) -> Vec<String> {
1066 serde_json::from_str(&self.tools).unwrap_or_default()
1067 }
1068
1069 pub fn set_tools(&mut self, tools: Vec<String>) {
1071 self.tools = serde_json::to_string(&tools).unwrap_or_else(|_| "[]".to_string());
1072 }
1073
1074 pub fn average_rating(&self) -> Option<f32> {
1076 if self.rating_count > 0 {
1077 Some(self.rating_sum as f32 / self.rating_count as f32)
1078 } else {
1079 None
1080 }
1081 }
1082}
1083
1084#[derive(Debug, Clone)]
1086pub struct AgentExecution {
1087 pub id: String,
1088 pub agent_id: Option<String>,
1090 pub agent_name: String,
1092 pub user_id: String,
1093 pub input: String,
1094 pub output: Option<String>,
1095 pub tool_calls: Option<String>,
1097 pub tokens_input: Option<i32>,
1098 pub tokens_output: Option<i32>,
1099 pub duration_ms: Option<i32>,
1100 pub status: String,
1102 pub error_message: Option<String>,
1103 pub created_at: i64,
1104}
1105
1106impl AgentExecution {
1107 pub fn new(agent_name: String, user_id: String, input: String) -> Self {
1109 Self {
1110 id: uuid::Uuid::new_v4().to_string(),
1111 agent_id: None,
1112 agent_name,
1113 user_id,
1114 input,
1115 output: None,
1116 tool_calls: None,
1117 tokens_input: None,
1118 tokens_output: None,
1119 duration_ms: None,
1120 status: "pending".to_string(),
1121 error_message: None,
1122 created_at: Utc::now().timestamp(),
1123 }
1124 }
1125
1126 pub fn success(mut self, output: String, duration_ms: i32) -> Self {
1128 self.output = Some(output);
1129 self.duration_ms = Some(duration_ms);
1130 self.status = "success".to_string();
1131 self
1132 }
1133
1134 pub fn error(mut self, error: String) -> Self {
1136 self.error_message = Some(error);
1137 self.status = "error".to_string();
1138 self
1139 }
1140}