1use crate::error::{Error, Result};
7use crate::model::{Plan, PlanStatus, Project};
8use crate::storage::events::{insert_event, Event, EventType};
9use crate::storage::schema::apply_schema;
10use rusqlite::{Connection, OptionalExtension, Transaction};
11use std::collections::HashSet;
12use std::path::Path;
13use std::time::Duration;
14
15#[derive(Debug)]
17pub struct SqliteStorage {
18 conn: Connection,
19}
20
21pub struct MutationContext {
28 pub op_name: String,
30 pub actor: String,
32 pub events: Vec<Event>,
34 pub dirty_sessions: HashSet<String>,
36 pub dirty_issues: HashSet<String>,
37 pub dirty_items: HashSet<String>,
38 pub dirty_plans: HashSet<String>,
39 pub dirty_time_entries: HashSet<String>,
40}
41
42impl MutationContext {
43 #[must_use]
45 pub fn new(op_name: &str, actor: &str) -> Self {
46 Self {
47 op_name: op_name.to_string(),
48 actor: actor.to_string(),
49 events: Vec::new(),
50 dirty_sessions: HashSet::new(),
51 dirty_issues: HashSet::new(),
52 dirty_items: HashSet::new(),
53 dirty_plans: HashSet::new(),
54 dirty_time_entries: HashSet::new(),
55 }
56 }
57
58 pub fn record_event(
60 &mut self,
61 entity_type: &str,
62 entity_id: &str,
63 event_type: EventType,
64 ) {
65 self.events.push(Event::new(
66 entity_type,
67 entity_id,
68 event_type,
69 &self.actor,
70 ));
71 }
72
73 pub fn record_change(
75 &mut self,
76 entity_type: &str,
77 entity_id: &str,
78 event_type: EventType,
79 old_value: Option<String>,
80 new_value: Option<String>,
81 ) {
82 self.events.push(
83 Event::new(entity_type, entity_id, event_type, &self.actor)
84 .with_values(old_value, new_value),
85 );
86 }
87
88 pub fn mark_session_dirty(&mut self, session_id: &str) {
90 self.dirty_sessions.insert(session_id.to_string());
91 }
92
93 pub fn mark_issue_dirty(&mut self, issue_id: &str) {
95 self.dirty_issues.insert(issue_id.to_string());
96 }
97
98 pub fn mark_item_dirty(&mut self, item_id: &str) {
100 self.dirty_items.insert(item_id.to_string());
101 }
102
103 pub fn mark_plan_dirty(&mut self, plan_id: &str) {
105 self.dirty_plans.insert(plan_id.to_string());
106 }
107
108 pub fn mark_time_entry_dirty(&mut self, time_entry_id: &str) {
110 self.dirty_time_entries.insert(time_entry_id.to_string());
111 }
112}
113
114#[derive(Debug, Clone, Default, serde::Serialize, serde::Deserialize)]
119pub struct BackfillStats {
120 pub sessions: usize,
122 pub issues: usize,
124 pub context_items: usize,
126 pub plans: usize,
128 pub time_entries: usize,
130}
131
132impl BackfillStats {
133 #[must_use]
135 pub fn any(&self) -> bool {
136 self.sessions > 0 || self.issues > 0 || self.context_items > 0 || self.plans > 0 || self.time_entries > 0
137 }
138
139 #[must_use]
141 pub fn total(&self) -> usize {
142 self.sessions + self.issues + self.context_items + self.plans + self.time_entries
143 }
144}
145
146#[derive(Debug, Clone, Default, serde::Serialize, serde::Deserialize)]
151pub struct ProjectCounts {
152 pub sessions: usize,
154 pub issues: usize,
156 pub context_items: usize,
158 pub memories: usize,
160 pub checkpoints: usize,
162}
163
164impl ProjectCounts {
165 #[must_use]
167 pub fn total(&self) -> usize {
168 self.sessions + self.issues + self.context_items + self.memories + self.checkpoints
169 }
170}
171
172impl SqliteStorage {
173 pub fn open(path: &Path) -> Result<Self> {
181 Self::open_with_timeout(path, None)
182 }
183
184 pub fn open_with_timeout(path: &Path, timeout_ms: Option<u64>) -> Result<Self> {
190 let conn = Connection::open(path)?;
191
192 if let Some(timeout) = timeout_ms {
193 conn.busy_timeout(Duration::from_millis(timeout))?;
194 } else {
195 conn.busy_timeout(Duration::from_secs(5))?;
197 }
198
199 apply_schema(&conn)?;
200 Ok(Self { conn })
201 }
202
203 pub fn open_memory() -> Result<Self> {
209 let conn = Connection::open_in_memory()?;
210 apply_schema(&conn)?;
211 Ok(Self { conn })
212 }
213
214 #[must_use]
216 pub fn conn(&self) -> &Connection {
217 &self.conn
218 }
219
220 pub fn checkpoint(&self) -> Result<()> {
226 self.conn
227 .execute_batch("PRAGMA wal_checkpoint(TRUNCATE);")?;
228 Ok(())
229 }
230
231 pub fn backup_to(&self, dest_path: &Path) -> Result<()> {
237 self.checkpoint()?;
238 let mut dest = Connection::open(dest_path)?;
239 let backup = rusqlite::backup::Backup::new(&self.conn, &mut dest)?;
240 backup.step(-1)?;
242 Ok(())
243 }
244
245 pub fn mutate<F, R>(&mut self, op: &str, actor: &str, f: F) -> Result<R>
258 where
259 F: FnOnce(&Transaction, &mut MutationContext) -> Result<R>,
260 {
261 let tx = self
262 .conn
263 .transaction_with_behavior(rusqlite::TransactionBehavior::Immediate)?;
264
265 let mut ctx = MutationContext::new(op, actor);
266
267 let result = f(&tx, &mut ctx)?;
269
270 for event in &ctx.events {
272 insert_event(&tx, event)?;
273 }
274
275 tx.commit()?;
280
281 Ok(result)
282 }
283
284 pub fn create_session(
294 &mut self,
295 id: &str,
296 name: &str,
297 description: Option<&str>,
298 project_path: Option<&str>,
299 branch: Option<&str>,
300 actor: &str,
301 ) -> Result<()> {
302 let now = chrono::Utc::now().timestamp_millis();
303
304 self.mutate("create_session", actor, |tx, ctx| {
305 tx.execute(
306 "INSERT INTO sessions (id, name, description, project_path, branch, status, created_at, updated_at)
307 VALUES (?1, ?2, ?3, ?4, ?5, 'active', ?6, ?6)",
308 rusqlite::params![id, name, description, project_path, branch, now],
309 )?;
310
311 if let Some(path) = project_path {
313 tx.execute(
314 "INSERT INTO session_projects (session_id, project_path, added_at) VALUES (?1, ?2, ?3)",
315 rusqlite::params![id, path, now],
316 )?;
317 }
318
319 ctx.record_event("session", id, EventType::SessionCreated);
320 ctx.mark_session_dirty(id);
321
322 Ok(())
323 })
324 }
325
326 pub fn get_session(&self, id: &str) -> Result<Option<Session>> {
332 let mut stmt = self.conn.prepare(
333 "SELECT id, name, description, branch, channel, project_path, status, ended_at, created_at, updated_at
334 FROM sessions WHERE id = ?1",
335 )?;
336
337 let session = stmt
338 .query_row([id], |row| {
339 Ok(Session {
340 id: row.get(0)?,
341 name: row.get(1)?,
342 description: row.get(2)?,
343 branch: row.get(3)?,
344 channel: row.get(4)?,
345 project_path: row.get(5)?,
346 status: row.get(6)?,
347 ended_at: row.get(7)?,
348 created_at: row.get(8)?,
349 updated_at: row.get(9)?,
350 })
351 })
352 .optional()?;
353
354 Ok(session)
355 }
356
357 pub fn list_sessions(
363 &self,
364 project_path: Option<&str>,
365 status: Option<&str>,
366 limit: Option<u32>,
367 ) -> Result<Vec<Session>> {
368 self.list_sessions_with_search(project_path, status, limit, None)
369 }
370
371 pub fn list_sessions_with_search(
380 &self,
381 project_path: Option<&str>,
382 status: Option<&str>,
383 limit: Option<u32>,
384 search: Option<&str>,
385 ) -> Result<Vec<Session>> {
386 let limit = limit.unwrap_or(50);
387
388 let mut conditions: Vec<String> = Vec::new();
390 let mut params: Vec<String> = Vec::new();
391 let mut param_idx = 1;
392
393 let (from_clause, select_distinct) = if let Some(path) = project_path {
395 conditions.push(format!("sp.project_path = ?{param_idx}"));
397 params.push(path.to_string());
398 param_idx += 1;
399 (
400 "sessions s JOIN session_projects sp ON s.id = sp.session_id".to_string(),
401 "DISTINCT ",
402 )
403 } else {
404 ("sessions s".to_string(), "")
406 };
407
408 if let Some(st) = status {
409 conditions.push(format!("s.status = ?{param_idx}"));
410 params.push(st.to_string());
411 param_idx += 1;
412 }
413
414 if let Some(search_term) = search {
415 conditions.push(format!(
417 "(s.name LIKE ?{param_idx} COLLATE NOCASE OR s.description LIKE ?{param_idx} COLLATE NOCASE)"
418 ));
419 params.push(format!("%{search_term}%"));
420 param_idx += 1;
421 }
422
423 let where_clause = if conditions.is_empty() {
424 " WHERE 1=1".to_string()
425 } else {
426 format!(" WHERE {}", conditions.join(" AND "))
427 };
428
429 let sql = format!(
430 "SELECT {select_distinct}s.id, s.name, s.description, s.branch, s.channel, s.project_path, s.status, s.ended_at, s.created_at, s.updated_at
431 FROM {from_clause}{where_clause}
432 ORDER BY s.updated_at DESC LIMIT ?{param_idx}"
433 );
434 params.push(limit.to_string());
435
436 let mut stmt = self.conn.prepare(&sql)?;
437 let params_refs: Vec<&dyn rusqlite::ToSql> = params
438 .iter()
439 .map(|s| s as &dyn rusqlite::ToSql)
440 .collect();
441
442 let rows = stmt.query_map(params_refs.as_slice(), |row| {
443 Ok(Session {
444 id: row.get(0)?,
445 name: row.get(1)?,
446 description: row.get(2)?,
447 branch: row.get(3)?,
448 channel: row.get(4)?,
449 project_path: row.get(5)?,
450 status: row.get(6)?,
451 ended_at: row.get(7)?,
452 created_at: row.get(8)?,
453 updated_at: row.get(9)?,
454 })
455 })?;
456
457 rows.collect::<std::result::Result<Vec<_>, _>>()
458 .map_err(Error::from)
459 }
460
461 pub fn update_session_status(
467 &mut self,
468 id: &str,
469 status: &str,
470 actor: &str,
471 ) -> Result<()> {
472 let now = chrono::Utc::now().timestamp_millis();
473 let ended_at = if status == "completed" || status == "paused" {
474 Some(now)
475 } else {
476 None
477 };
478
479 self.mutate("update_session_status", actor, |tx, ctx| {
480 let rows = tx.execute(
481 "UPDATE sessions SET status = ?1, ended_at = ?2, updated_at = ?3 WHERE id = ?4",
482 rusqlite::params![status, ended_at, now, id],
483 )?;
484
485 if rows == 0 {
486 return Err(Error::SessionNotFound { id: id.to_string() });
487 }
488
489 let event_type = match status {
490 "paused" => EventType::SessionPaused,
491 "completed" => EventType::SessionCompleted,
492 _ => EventType::SessionUpdated,
493 };
494 ctx.record_event("session", id, event_type);
495 ctx.mark_session_dirty(id);
496
497 Ok(())
498 })
499 }
500
501 pub fn rename_session(
507 &mut self,
508 id: &str,
509 new_name: &str,
510 actor: &str,
511 ) -> Result<()> {
512 let now = chrono::Utc::now().timestamp_millis();
513
514 self.mutate("rename_session", actor, |tx, ctx| {
515 let rows = tx.execute(
516 "UPDATE sessions SET name = ?1, updated_at = ?2 WHERE id = ?3",
517 rusqlite::params![new_name, now, id],
518 )?;
519
520 if rows == 0 {
521 return Err(Error::SessionNotFound { id: id.to_string() });
522 }
523
524 ctx.record_event("session", id, EventType::SessionUpdated);
525 ctx.mark_session_dirty(id);
526
527 Ok(())
528 })
529 }
530
531 pub fn delete_session(&mut self, id: &str, actor: &str) -> Result<()> {
542 self.mutate("delete_session", actor, |tx, ctx| {
543 let exists: bool = tx
545 .query_row(
546 "SELECT 1 FROM sessions WHERE id = ?1",
547 [id],
548 |_| Ok(true),
549 )
550 .unwrap_or(false);
551
552 if !exists {
553 return Err(Error::SessionNotFound { id: id.to_string() });
554 }
555
556 tx.execute(
558 "DELETE FROM context_items WHERE session_id = ?1",
559 [id],
560 )?;
561
562 tx.execute(
564 "DELETE FROM checkpoints WHERE session_id = ?1",
565 [id],
566 )?;
567
568 tx.execute(
570 "DELETE FROM session_projects WHERE session_id = ?1",
571 [id],
572 )?;
573
574 tx.execute("DELETE FROM sessions WHERE id = ?1", [id])?;
576
577 ctx.record_event("session", id, EventType::SessionDeleted);
578
579 Ok(())
580 })
581 }
582
583 pub fn add_session_path(
589 &mut self,
590 session_id: &str,
591 project_path: &str,
592 actor: &str,
593 ) -> Result<()> {
594 let now = chrono::Utc::now().timestamp_millis();
595
596 self.mutate("add_session_path", actor, |tx, ctx| {
597 let exists: bool = tx
599 .query_row(
600 "SELECT 1 FROM sessions WHERE id = ?1",
601 [session_id],
602 |_| Ok(true),
603 )
604 .unwrap_or(false);
605
606 if !exists {
607 return Err(Error::SessionNotFound { id: session_id.to_string() });
608 }
609
610 let result = tx.execute(
612 "INSERT INTO session_projects (session_id, project_path, added_at) VALUES (?1, ?2, ?3)",
613 rusqlite::params![session_id, project_path, now],
614 );
615
616 match result {
617 Ok(_) => {
618 ctx.record_event("session", session_id, EventType::SessionPathAdded);
619 ctx.mark_session_dirty(session_id);
620 Ok(())
621 }
622 Err(rusqlite::Error::SqliteFailure(err, _))
623 if err.code == rusqlite::ErrorCode::ConstraintViolation =>
624 {
625 Err(Error::Other(format!(
626 "Path already added to session: {project_path}"
627 )))
628 }
629 Err(e) => Err(e.into()),
630 }
631 })
632 }
633
634 pub fn remove_session_path(
642 &mut self,
643 session_id: &str,
644 project_path: &str,
645 actor: &str,
646 ) -> Result<()> {
647 self.mutate("remove_session_path", actor, |tx, ctx| {
648 let session_path: Option<String> = tx
650 .query_row(
651 "SELECT project_path FROM sessions WHERE id = ?1",
652 [session_id],
653 |row| row.get(0),
654 )
655 .optional()?;
656
657 let primary_path = session_path.ok_or_else(|| Error::SessionNotFound {
658 id: session_id.to_string(),
659 })?;
660
661 if primary_path == project_path {
663 return Err(Error::Other(
664 "Cannot remove primary project path. Use delete_session instead.".to_string(),
665 ));
666 }
667
668 let rows = tx.execute(
670 "DELETE FROM session_projects WHERE session_id = ?1 AND project_path = ?2",
671 rusqlite::params![session_id, project_path],
672 )?;
673
674 if rows == 0 {
675 return Err(Error::Other(format!(
676 "Path not found in session: {project_path}"
677 )));
678 }
679
680 ctx.record_event("session", session_id, EventType::SessionPathRemoved);
681 ctx.mark_session_dirty(session_id);
682
683 Ok(())
684 })
685 }
686
687 pub fn get_session_paths(&self, session_id: &str) -> Result<Vec<String>> {
691 let conn = self.conn();
692
693 let primary_path: Option<String> = conn
695 .query_row(
696 "SELECT project_path FROM sessions WHERE id = ?1",
697 [session_id],
698 |row| row.get(0),
699 )
700 .optional()?;
701
702 let Some(primary) = primary_path else {
703 return Err(Error::SessionNotFound { id: session_id.to_string() });
704 };
705
706 let mut stmt = conn.prepare(
708 "SELECT project_path FROM session_projects WHERE session_id = ?1 ORDER BY added_at",
709 )?;
710
711 let additional_paths: Vec<String> = stmt
712 .query_map([session_id], |row| row.get(0))?
713 .filter_map(|r| r.ok())
714 .collect();
715
716 let mut paths = vec![primary];
718 paths.extend(additional_paths);
719
720 Ok(paths)
721 }
722
723 pub fn save_context_item(
733 &mut self,
734 id: &str,
735 session_id: &str,
736 key: &str,
737 value: &str,
738 category: Option<&str>,
739 priority: Option<&str>,
740 actor: &str,
741 ) -> Result<()> {
742 let now = chrono::Utc::now().timestamp_millis();
743 let category = category.unwrap_or("note");
744 let priority = priority.unwrap_or("normal");
745 let size = value.len() as i64;
746
747 self.mutate("save_context_item", actor, |tx, ctx| {
748 let exists: bool = tx
750 .prepare("SELECT 1 FROM context_items WHERE session_id = ?1 AND key = ?2")?
751 .exists(rusqlite::params![session_id, key])?;
752
753 tx.execute(
754 "INSERT INTO context_items (id, session_id, key, value, category, priority, size, created_at, updated_at)
755 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?8)
756 ON CONFLICT(session_id, key) DO UPDATE SET
757 value = excluded.value,
758 category = excluded.category,
759 priority = excluded.priority,
760 size = excluded.size,
761 updated_at = excluded.updated_at",
762 rusqlite::params![id, session_id, key, value, category, priority, size, now],
763 )?;
764
765 let event_type = if exists {
766 EventType::ItemUpdated
767 } else {
768 EventType::ItemCreated
769 };
770 ctx.record_event("context_item", id, event_type);
771 ctx.mark_item_dirty(id);
772
773 Ok(())
774 })
775 }
776
777 pub fn get_item_id_by_key(&self, session_id: &str, key: &str) -> Result<Option<String>> {
781 let id = self.conn.query_row(
782 "SELECT id FROM context_items WHERE session_id = ?1 AND key = ?2",
783 rusqlite::params![session_id, key],
784 |row| row.get(0),
785 ).optional()?;
786 Ok(id)
787 }
788
789 pub fn get_items_with_fast_embeddings(
794 &self,
795 session_id: &str,
796 ) -> Result<Vec<(ContextItem, Option<Vec<f32>>)>> {
797 let sql = "SELECT ci.id, ci.session_id, ci.key, ci.value, ci.category, ci.priority,
798 ci.channel, ci.tags, ci.size, ci.created_at, ci.updated_at,
799 ec.embedding
800 FROM context_items ci
801 LEFT JOIN embedding_chunks_fast ec ON ec.item_id = ci.id AND ec.chunk_index = 0
802 WHERE ci.session_id = ?1
803 ORDER BY ci.updated_at DESC";
804
805 let mut stmt = self.conn.prepare(sql)?;
806 let rows = stmt.query_map(rusqlite::params![session_id], |row| {
807 let item = ContextItem {
808 id: row.get(0)?,
809 session_id: row.get(1)?,
810 key: row.get(2)?,
811 value: row.get(3)?,
812 category: row.get(4)?,
813 priority: row.get(5)?,
814 channel: row.get(6)?,
815 tags: row.get(7)?,
816 size: row.get(8)?,
817 created_at: row.get(9)?,
818 updated_at: row.get(10)?,
819 };
820
821 let embedding: Option<Vec<f32>> = row.get::<_, Option<Vec<u8>>>(11)?
822 .map(|blob| {
823 blob.chunks_exact(4)
824 .map(|bytes| f32::from_le_bytes([bytes[0], bytes[1], bytes[2], bytes[3]]))
825 .collect()
826 });
827
828 Ok((item, embedding))
829 })?;
830
831 let mut results = Vec::new();
832 for row in rows {
833 results.push(row?);
834 }
835 Ok(results)
836 }
837
838 pub fn get_context_items(
844 &self,
845 session_id: &str,
846 category: Option<&str>,
847 priority: Option<&str>,
848 limit: Option<u32>,
849 ) -> Result<Vec<ContextItem>> {
850 let limit = limit.unwrap_or(100);
851
852 let mut sql = String::from(
853 "SELECT id, session_id, key, value, category, priority, channel, tags, size, created_at, updated_at
854 FROM context_items WHERE session_id = ?1",
855 );
856
857 let mut params: Vec<Box<dyn rusqlite::ToSql>> = vec![Box::new(session_id.to_string())];
858
859 if let Some(cat) = category {
860 sql.push_str(" AND category = ?");
861 params.push(Box::new(cat.to_string()));
862 }
863
864 if let Some(pri) = priority {
865 sql.push_str(" AND priority = ?");
866 params.push(Box::new(pri.to_string()));
867 }
868
869 sql.push_str(" ORDER BY created_at DESC LIMIT ?");
870 params.push(Box::new(limit));
871
872 let mut stmt = self.conn.prepare(&sql)?;
873 let params_refs: Vec<&dyn rusqlite::ToSql> = params
874 .iter()
875 .map(|b| b.as_ref())
876 .collect();
877
878 let rows = stmt.query_map(params_refs.as_slice(), |row| {
879 Ok(ContextItem {
880 id: row.get(0)?,
881 session_id: row.get(1)?,
882 key: row.get(2)?,
883 value: row.get(3)?,
884 category: row.get(4)?,
885 priority: row.get(5)?,
886 channel: row.get(6)?,
887 tags: row.get(7)?,
888 size: row.get(8)?,
889 created_at: row.get(9)?,
890 updated_at: row.get(10)?,
891 })
892 })?;
893
894 rows.collect::<std::result::Result<Vec<_>, _>>()
895 .map_err(Error::from)
896 }
897
898 pub fn delete_context_item(
904 &mut self,
905 session_id: &str,
906 key: &str,
907 actor: &str,
908 ) -> Result<()> {
909 self.mutate("delete_context_item", actor, |tx, ctx| {
910 let info: Option<(String, Option<String>)> = tx
912 .query_row(
913 "SELECT ci.id, s.project_path
914 FROM context_items ci
915 JOIN sessions s ON ci.session_id = s.id
916 WHERE ci.session_id = ?1 AND ci.key = ?2",
917 rusqlite::params![session_id, key],
918 |row| Ok((row.get(0)?, row.get(1)?)),
919 )
920 .optional()?;
921
922 let rows = tx.execute(
923 "DELETE FROM context_items WHERE session_id = ?1 AND key = ?2",
924 rusqlite::params![session_id, key],
925 )?;
926
927 if rows > 0 {
928 if let Some((item_id, project_path)) = info {
929 ctx.record_event("context_item", &item_id, EventType::ItemDeleted);
930
931 if let Some(ref path) = project_path {
933 let now = chrono::Utc::now().timestamp_millis();
934 tx.execute(
935 "INSERT INTO sync_deletions (entity_type, entity_id, project_path, deleted_at, deleted_by, exported)
936 VALUES ('context_item', ?1, ?2, ?3, ?4, 0)
937 ON CONFLICT(entity_type, entity_id) DO UPDATE SET
938 deleted_at = excluded.deleted_at,
939 deleted_by = excluded.deleted_by,
940 exported = 0",
941 rusqlite::params![item_id, path, now, ctx.actor],
942 )?;
943 }
944 }
945 }
946
947 Ok(())
948 })
949 }
950
951 pub fn update_context_item(
957 &mut self,
958 session_id: &str,
959 key: &str,
960 value: Option<&str>,
961 category: Option<&str>,
962 priority: Option<&str>,
963 channel: Option<&str>,
964 actor: &str,
965 ) -> Result<()> {
966 self.mutate("update_context_item", actor, |tx, ctx| {
967 let now = chrono::Utc::now().timestamp_millis();
968
969 let mut set_parts: Vec<&str> = vec!["updated_at"];
971 let mut params: Vec<Box<dyn rusqlite::ToSql>> = vec![Box::new(now)];
972
973 if let Some(v) = value {
974 set_parts.push("value");
975 set_parts.push("size");
976 params.push(Box::new(v.to_string()));
977 params.push(Box::new(v.len() as i64));
978 }
979 if let Some(c) = category {
980 set_parts.push("category");
981 params.push(Box::new(c.to_string()));
982 }
983 if let Some(p) = priority {
984 set_parts.push("priority");
985 params.push(Box::new(p.to_string()));
986 }
987 if let Some(ch) = channel {
988 set_parts.push("channel");
989 params.push(Box::new(ch.to_string()));
990 }
991
992 let item_id: Option<String> = tx
994 .query_row(
995 "SELECT id FROM context_items WHERE session_id = ?1 AND key = ?2",
996 rusqlite::params![session_id, key],
997 |row| row.get(0),
998 )
999 .optional()?;
1000
1001 if item_id.is_none() {
1002 return Err(Error::Database(rusqlite::Error::QueryReturnedNoRows));
1003 }
1004
1005 let set_clause: String = set_parts
1007 .iter()
1008 .enumerate()
1009 .map(|(i, field)| format!("{} = ?{}", field, i + 1))
1010 .collect::<Vec<_>>()
1011 .join(", ");
1012
1013 let param_count = params.len();
1014 let query = format!(
1015 "UPDATE context_items SET {} WHERE session_id = ?{} AND key = ?{}",
1016 set_clause,
1017 param_count + 1,
1018 param_count + 2
1019 );
1020
1021 params.push(Box::new(session_id.to_string()));
1022 params.push(Box::new(key.to_string()));
1023
1024 let param_refs: Vec<&dyn rusqlite::ToSql> = params.iter().map(|p| p.as_ref()).collect();
1025 tx.execute(&query, param_refs.as_slice())?;
1026
1027 if let Some(id) = item_id {
1028 ctx.record_event("context_item", &id, EventType::ItemUpdated);
1029 }
1030
1031 Ok(())
1032 })
1033 }
1034
1035 pub fn add_tags_to_item(
1041 &mut self,
1042 session_id: &str,
1043 key: &str,
1044 tags_to_add: &[String],
1045 actor: &str,
1046 ) -> Result<()> {
1047 self.mutate("add_tags_to_item", actor, |tx, ctx| {
1048 let now = chrono::Utc::now().timestamp_millis();
1049
1050 let (item_id, current_tags): (String, String) = tx.query_row(
1052 "SELECT id, tags FROM context_items WHERE session_id = ?1 AND key = ?2",
1053 rusqlite::params![session_id, key],
1054 |row| Ok((row.get(0)?, row.get::<_, Option<String>>(1)?.unwrap_or_else(|| "[]".to_string()))),
1055 )?;
1056
1057 let mut tags: Vec<String> = serde_json::from_str(¤t_tags).unwrap_or_default();
1059
1060 for tag in tags_to_add {
1062 if !tags.contains(tag) {
1063 tags.push(tag.clone());
1064 }
1065 }
1066
1067 let new_tags = serde_json::to_string(&tags).unwrap_or_else(|_| "[]".to_string());
1069 tx.execute(
1070 "UPDATE context_items SET tags = ?1, updated_at = ?2 WHERE id = ?3",
1071 rusqlite::params![new_tags, now, item_id],
1072 )?;
1073
1074 ctx.record_event("context_item", &item_id, EventType::ItemUpdated);
1075
1076 Ok(())
1077 })
1078 }
1079
1080 pub fn remove_tags_from_item(
1086 &mut self,
1087 session_id: &str,
1088 key: &str,
1089 tags_to_remove: &[String],
1090 actor: &str,
1091 ) -> Result<()> {
1092 self.mutate("remove_tags_from_item", actor, |tx, ctx| {
1093 let now = chrono::Utc::now().timestamp_millis();
1094
1095 let (item_id, current_tags): (String, String) = tx.query_row(
1097 "SELECT id, tags FROM context_items WHERE session_id = ?1 AND key = ?2",
1098 rusqlite::params![session_id, key],
1099 |row| Ok((row.get(0)?, row.get::<_, Option<String>>(1)?.unwrap_or_else(|| "[]".to_string()))),
1100 )?;
1101
1102 let mut tags: Vec<String> = serde_json::from_str(¤t_tags).unwrap_or_default();
1104
1105 tags.retain(|t| !tags_to_remove.contains(t));
1107
1108 let new_tags = serde_json::to_string(&tags).unwrap_or_else(|_| "[]".to_string());
1110 tx.execute(
1111 "UPDATE context_items SET tags = ?1, updated_at = ?2 WHERE id = ?3",
1112 rusqlite::params![new_tags, now, item_id],
1113 )?;
1114
1115 ctx.record_event("context_item", &item_id, EventType::ItemUpdated);
1116
1117 Ok(())
1118 })
1119 }
1120
1121 #[allow(clippy::too_many_arguments)]
1131 pub fn create_issue(
1132 &mut self,
1133 id: &str,
1134 short_id: Option<&str>,
1135 project_path: &str,
1136 title: &str,
1137 description: Option<&str>,
1138 details: Option<&str>,
1139 issue_type: Option<&str>,
1140 priority: Option<i32>,
1141 plan_id: Option<&str>,
1142 actor: &str,
1143 ) -> Result<()> {
1144 let now = chrono::Utc::now().timestamp_millis();
1145 let issue_type = issue_type.unwrap_or("task");
1146 let priority = priority.unwrap_or(2);
1147
1148 self.mutate("create_issue", actor, |tx, ctx| {
1149 tx.execute(
1150 "INSERT INTO issues (id, short_id, project_path, title, description, details, issue_type, priority, plan_id, status, created_by_agent, created_at, updated_at)
1151 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, 'open', ?10, ?11, ?11)",
1152 rusqlite::params![id, short_id, project_path, title, description, details, issue_type, priority, plan_id, actor, now],
1153 )?;
1154
1155 ctx.record_event("issue", id, EventType::IssueCreated);
1156 ctx.mark_issue_dirty(id);
1157
1158 Ok(())
1159 })
1160 }
1161
1162 pub fn get_issue(&self, id: &str, project_path: Option<&str>) -> Result<Option<Issue>> {
1168 let sql = if project_path.is_some() {
1170 "SELECT id, short_id, project_path, title, description, details, status, priority, issue_type, plan_id, created_by_agent, assigned_to_agent, created_at, updated_at, closed_at
1171 FROM issues WHERE (id = ?1 OR short_id = ?1) AND project_path = ?2"
1172 } else {
1173 "SELECT id, short_id, project_path, title, description, details, status, priority, issue_type, plan_id, created_by_agent, assigned_to_agent, created_at, updated_at, closed_at
1174 FROM issues WHERE id = ?1 OR short_id = ?1"
1175 };
1176
1177 let mut stmt = self.conn.prepare(sql)?;
1178
1179 let issue = if let Some(path) = project_path {
1180 stmt.query_row(rusqlite::params![id, path], map_issue_row)
1181 } else {
1182 stmt.query_row([id], map_issue_row)
1183 }
1184 .optional()?;
1185
1186 Ok(issue)
1187 }
1188
1189 pub fn list_issues(
1195 &self,
1196 project_path: &str,
1197 status: Option<&str>,
1198 issue_type: Option<&str>,
1199 limit: Option<u32>,
1200 ) -> Result<Vec<Issue>> {
1201 let limit = limit.unwrap_or(50);
1202
1203 let mut sql = String::from(
1204 "SELECT id, short_id, project_path, title, description, details, status, priority, issue_type, plan_id, created_by_agent, assigned_to_agent, created_at, updated_at, closed_at
1205 FROM issues WHERE project_path = ?1",
1206 );
1207
1208 let mut params: Vec<Box<dyn rusqlite::ToSql>> = vec![Box::new(project_path.to_string())];
1209
1210 if let Some(st) = status {
1211 if st != "all" {
1212 sql.push_str(" AND status = ?");
1213 params.push(Box::new(st.to_string()));
1214 }
1215 } else {
1216 sql.push_str(" AND status != 'closed'");
1218 }
1219
1220 if let Some(t) = issue_type {
1221 sql.push_str(" AND issue_type = ?");
1222 params.push(Box::new(t.to_string()));
1223 }
1224
1225 sql.push_str(" ORDER BY priority DESC, created_at ASC LIMIT ?");
1226 params.push(Box::new(limit));
1227
1228 let mut stmt = self.conn.prepare(&sql)?;
1229 let params_refs: Vec<&dyn rusqlite::ToSql> = params
1230 .iter()
1231 .map(|b| b.as_ref())
1232 .collect();
1233
1234 let rows = stmt.query_map(params_refs.as_slice(), map_issue_row)?;
1235
1236 rows.collect::<std::result::Result<Vec<_>, _>>()
1237 .map_err(Error::from)
1238 }
1239
1240 pub fn list_all_issues(
1246 &self,
1247 status: Option<&str>,
1248 issue_type: Option<&str>,
1249 limit: Option<u32>,
1250 ) -> Result<Vec<Issue>> {
1251 let limit = limit.unwrap_or(50);
1252
1253 let mut sql = String::from(
1254 "SELECT id, short_id, project_path, title, description, details, status, priority, issue_type, plan_id, created_by_agent, assigned_to_agent, created_at, updated_at, closed_at
1255 FROM issues WHERE 1=1",
1256 );
1257
1258 let mut params: Vec<Box<dyn rusqlite::ToSql>> = vec![];
1259
1260 if let Some(st) = status {
1261 if st != "all" {
1262 sql.push_str(" AND status = ?");
1263 params.push(Box::new(st.to_string()));
1264 }
1265 } else {
1266 sql.push_str(" AND status != 'closed'");
1268 }
1269
1270 if let Some(t) = issue_type {
1271 sql.push_str(" AND issue_type = ?");
1272 params.push(Box::new(t.to_string()));
1273 }
1274
1275 sql.push_str(" ORDER BY priority DESC, created_at ASC LIMIT ?");
1276 params.push(Box::new(limit));
1277
1278 let mut stmt = self.conn.prepare(&sql)?;
1279 let params_refs: Vec<&dyn rusqlite::ToSql> = params.iter().map(|b| b.as_ref()).collect();
1280
1281 let rows = stmt.query_map(params_refs.as_slice(), map_issue_row)?;
1282
1283 rows.collect::<std::result::Result<Vec<_>, _>>()
1284 .map_err(Error::from)
1285 }
1286
1287 pub fn update_issue_status(
1295 &mut self,
1296 id: &str,
1297 status: &str,
1298 actor: &str,
1299 ) -> Result<()> {
1300 let now = chrono::Utc::now().timestamp_millis();
1301 let closed_at = if status == "closed" { Some(now) } else { None };
1302
1303 self.mutate("update_issue_status", actor, |tx, ctx| {
1304 let rows = tx.execute(
1305 "UPDATE issues SET status = ?1, closed_at = ?2, closed_by_agent = ?3, updated_at = ?4 WHERE id = ?5 OR short_id = ?5",
1306 rusqlite::params![status, closed_at, if status == "closed" { Some(actor) } else { None }, now, id],
1307 )?;
1308
1309 if rows == 0 {
1310 return Err(Error::IssueNotFound { id: id.to_string() });
1311 }
1312
1313 let event_type = if status == "closed" {
1314 EventType::IssueClosed
1315 } else {
1316 EventType::IssueUpdated
1317 };
1318 ctx.record_event("issue", id, event_type);
1319 ctx.mark_issue_dirty(id);
1320
1321 Ok(())
1322 })
1323 }
1324
1325 #[allow(clippy::too_many_arguments)]
1333 pub fn update_issue(
1334 &mut self,
1335 id: &str,
1336 title: Option<&str>,
1337 description: Option<&str>,
1338 details: Option<&str>,
1339 priority: Option<i32>,
1340 issue_type: Option<&str>,
1341 plan_id: Option<&str>,
1342 parent_id: Option<&str>,
1343 actor: &str,
1344 ) -> Result<()> {
1345 let now = chrono::Utc::now().timestamp_millis();
1346
1347 let mut set_clauses = vec!["updated_at = ?"];
1349 let mut params: Vec<Box<dyn rusqlite::ToSql>> = vec![Box::new(now)];
1350
1351 if let Some(t) = title {
1352 set_clauses.push("title = ?");
1353 params.push(Box::new(t.to_string()));
1354 }
1355 if let Some(d) = description {
1356 set_clauses.push("description = ?");
1357 params.push(Box::new(d.to_string()));
1358 }
1359 if let Some(dt) = details {
1360 set_clauses.push("details = ?");
1361 params.push(Box::new(dt.to_string()));
1362 }
1363 if let Some(p) = priority {
1364 set_clauses.push("priority = ?");
1365 params.push(Box::new(p));
1366 }
1367 if let Some(it) = issue_type {
1368 set_clauses.push("issue_type = ?");
1369 params.push(Box::new(it.to_string()));
1370 }
1371 if let Some(pid) = plan_id {
1372 set_clauses.push("plan_id = ?");
1373 params.push(Box::new(pid.to_string()));
1374 }
1375
1376 if set_clauses.len() == 1 && parent_id.is_none() {
1378 return Ok(());
1379 }
1380
1381 self.mutate("update_issue", actor, |tx, ctx| {
1382 if set_clauses.len() > 1 {
1384 let sql = format!(
1385 "UPDATE issues SET {} WHERE id = ? OR short_id = ?",
1386 set_clauses.join(", ")
1387 );
1388 params.push(Box::new(id.to_string()));
1389 params.push(Box::new(id.to_string()));
1390
1391 let param_refs: Vec<&dyn rusqlite::ToSql> = params.iter().map(|p| p.as_ref()).collect();
1392 let rows = tx.execute(&sql, param_refs.as_slice())?;
1393
1394 if rows == 0 {
1395 return Err(Error::IssueNotFound { id: id.to_string() });
1396 }
1397 }
1398
1399 if let Some(new_parent) = parent_id {
1401 let full_id: String = tx.query_row(
1403 "SELECT id FROM issues WHERE id = ?1 OR short_id = ?1",
1404 [id],
1405 |row| row.get(0),
1406 )?;
1407
1408 tx.execute(
1410 "DELETE FROM issue_dependencies WHERE issue_id = ?1 AND dependency_type = 'parent-child'",
1411 [&full_id],
1412 )?;
1413
1414 if !new_parent.is_empty() {
1416 let parent_full_id: String = tx.query_row(
1417 "SELECT id FROM issues WHERE id = ?1 OR short_id = ?1",
1418 [new_parent],
1419 |row| row.get(0),
1420 )?;
1421
1422 tx.execute(
1423 "INSERT INTO issue_dependencies (issue_id, depends_on_id, dependency_type, created_at)
1424 VALUES (?1, ?2, 'parent-child', ?3)",
1425 rusqlite::params![full_id, parent_full_id, now],
1426 )?;
1427 }
1428 }
1429
1430 ctx.record_event("issue", id, EventType::IssueUpdated);
1431 ctx.mark_issue_dirty(id);
1432
1433 Ok(())
1434 })
1435 }
1436
1437 pub fn claim_issue(&mut self, id: &str, actor: &str) -> Result<()> {
1445 let now = chrono::Utc::now().timestamp_millis();
1446
1447 self.mutate("claim_issue", actor, |tx, ctx| {
1448 let rows = tx.execute(
1449 "UPDATE issues SET assigned_to_agent = ?1, assigned_at = ?2, status = 'in_progress', updated_at = ?2 WHERE id = ?3 OR short_id = ?3",
1450 rusqlite::params![actor, now, id],
1451 )?;
1452
1453 if rows == 0 {
1454 return Err(Error::IssueNotFound { id: id.to_string() });
1455 }
1456
1457 ctx.record_event("issue", id, EventType::IssueClaimed);
1458 ctx.mark_issue_dirty(id);
1459
1460 Ok(())
1461 })
1462 }
1463
1464 pub fn release_issue(&mut self, id: &str, actor: &str) -> Result<()> {
1472 let now = chrono::Utc::now().timestamp_millis();
1473
1474 self.mutate("release_issue", actor, |tx, ctx| {
1475 let rows = tx.execute(
1476 "UPDATE issues SET assigned_to_agent = NULL, assigned_at = NULL, status = 'open', updated_at = ?1 WHERE id = ?2 OR short_id = ?2",
1477 rusqlite::params![now, id],
1478 )?;
1479
1480 if rows == 0 {
1481 return Err(Error::IssueNotFound { id: id.to_string() });
1482 }
1483
1484 ctx.record_event("issue", id, EventType::IssueReleased);
1485 ctx.mark_issue_dirty(id);
1486
1487 Ok(())
1488 })
1489 }
1490
1491 pub fn delete_issue(&mut self, id: &str, actor: &str) -> Result<()> {
1499 self.mutate("delete_issue", actor, |tx, ctx| {
1500 let info: Option<(String, String)> = tx
1502 .query_row(
1503 "SELECT id, project_path FROM issues WHERE id = ?1 OR short_id = ?1",
1504 [id],
1505 |row| Ok((row.get(0)?, row.get(1)?)),
1506 )
1507 .optional()?;
1508
1509 let (full_id, project_path) =
1510 info.ok_or_else(|| Error::IssueNotFound { id: id.to_string() })?;
1511
1512 tx.execute(
1514 "DELETE FROM issue_dependencies WHERE issue_id = ?1 OR depends_on_id = ?1",
1515 [&full_id],
1516 )?;
1517
1518 let rows = tx.execute("DELETE FROM issues WHERE id = ?1", [&full_id])?;
1520
1521 if rows == 0 {
1522 return Err(Error::IssueNotFound { id: id.to_string() });
1523 }
1524
1525 ctx.record_event("issue", &full_id, EventType::IssueDeleted);
1526
1527 let now = chrono::Utc::now().timestamp_millis();
1529 tx.execute(
1530 "INSERT INTO sync_deletions (entity_type, entity_id, project_path, deleted_at, deleted_by, exported)
1531 VALUES ('issue', ?1, ?2, ?3, ?4, 0)
1532 ON CONFLICT(entity_type, entity_id) DO UPDATE SET
1533 deleted_at = excluded.deleted_at,
1534 deleted_by = excluded.deleted_by,
1535 exported = 0",
1536 rusqlite::params![full_id, project_path, now, ctx.actor],
1537 )?;
1538
1539 Ok(())
1540 })
1541 }
1542
1543 pub fn add_issue_labels(&mut self, id: &str, labels: &[String], actor: &str) -> Result<()> {
1549 self.mutate("add_issue_labels", actor, |tx, ctx| {
1550 let full_id: String = tx
1552 .query_row(
1553 "SELECT id FROM issues WHERE id = ?1 OR short_id = ?1",
1554 [id],
1555 |row| row.get(0),
1556 )
1557 .optional()?
1558 .ok_or_else(|| Error::IssueNotFound { id: id.to_string() })?;
1559
1560 for label in labels {
1561 let label_id = format!("label_{}", &uuid::Uuid::new_v4().to_string()[..12]);
1562 tx.execute(
1563 "INSERT OR IGNORE INTO issue_labels (id, issue_id, label) VALUES (?1, ?2, ?3)",
1564 rusqlite::params![label_id, full_id, label],
1565 )?;
1566 }
1567
1568 ctx.record_event("issue", &full_id, EventType::IssueUpdated);
1569 Ok(())
1570 })
1571 }
1572
1573 pub fn remove_issue_labels(&mut self, id: &str, labels: &[String], actor: &str) -> Result<()> {
1579 self.mutate("remove_issue_labels", actor, |tx, ctx| {
1580 let full_id: String = tx
1582 .query_row(
1583 "SELECT id FROM issues WHERE id = ?1 OR short_id = ?1",
1584 [id],
1585 |row| row.get(0),
1586 )
1587 .optional()?
1588 .ok_or_else(|| Error::IssueNotFound { id: id.to_string() })?;
1589
1590 for label in labels {
1591 tx.execute(
1592 "DELETE FROM issue_labels WHERE issue_id = ?1 AND label = ?2",
1593 rusqlite::params![full_id, label],
1594 )?;
1595 }
1596
1597 ctx.record_event("issue", &full_id, EventType::IssueUpdated);
1598 Ok(())
1599 })
1600 }
1601
1602 pub fn get_issue_labels(&self, id: &str) -> Result<Vec<String>> {
1608 let full_id: String = self
1609 .conn
1610 .query_row(
1611 "SELECT id FROM issues WHERE id = ?1 OR short_id = ?1",
1612 [id],
1613 |row| row.get(0),
1614 )
1615 .optional()?
1616 .ok_or_else(|| Error::IssueNotFound { id: id.to_string() })?;
1617
1618 let mut stmt = self
1619 .conn
1620 .prepare("SELECT label FROM issue_labels WHERE issue_id = ?1 ORDER BY label")?;
1621 let labels = stmt
1622 .query_map([&full_id], |row| row.get(0))?
1623 .collect::<std::result::Result<Vec<String>, _>>()?;
1624 Ok(labels)
1625 }
1626
1627 pub fn issue_has_dependencies(&self, id: &str) -> Result<bool> {
1629 let full_id: String = self
1630 .conn
1631 .query_row(
1632 "SELECT id FROM issues WHERE id = ?1 OR short_id = ?1",
1633 [id],
1634 |row| row.get(0),
1635 )
1636 .optional()?
1637 .ok_or_else(|| Error::IssueNotFound { id: id.to_string() })?;
1638
1639 let count: i64 = self.conn.query_row(
1640 "SELECT COUNT(*) FROM issue_dependencies WHERE issue_id = ?1",
1641 [&full_id],
1642 |row| row.get(0),
1643 )?;
1644 Ok(count > 0)
1645 }
1646
1647 pub fn issue_has_subtasks(&self, id: &str) -> Result<bool> {
1649 let full_id: String = self
1650 .conn
1651 .query_row(
1652 "SELECT id FROM issues WHERE id = ?1 OR short_id = ?1",
1653 [id],
1654 |row| row.get(0),
1655 )
1656 .optional()?
1657 .ok_or_else(|| Error::IssueNotFound { id: id.to_string() })?;
1658
1659 let count: i64 = self.conn.query_row(
1660 "SELECT COUNT(*) FROM issue_dependencies WHERE depends_on_id = ?1 AND dependency_type = 'parent-child'",
1661 [&full_id],
1662 |row| row.get(0),
1663 )?;
1664 Ok(count > 0)
1665 }
1666
1667 pub fn get_child_issue_ids(&self, parent_id: &str) -> Result<std::collections::HashSet<String>> {
1671 let full_parent_id: String = self
1673 .conn
1674 .query_row(
1675 "SELECT id FROM issues WHERE id = ?1 OR short_id = ?1",
1676 [parent_id],
1677 |row| row.get(0),
1678 )
1679 .optional()?
1680 .ok_or_else(|| Error::IssueNotFound { id: parent_id.to_string() })?;
1681
1682 let mut stmt = self.conn.prepare(
1683 "SELECT issue_id FROM issue_dependencies
1684 WHERE depends_on_id = ?1 AND dependency_type = 'parent-child'",
1685 )?;
1686
1687 let rows = stmt.query_map([&full_parent_id], |row| row.get::<_, String>(0))?;
1688
1689 let mut ids = std::collections::HashSet::new();
1690 for row in rows {
1691 ids.insert(row?);
1692 }
1693 Ok(ids)
1694 }
1695
1696 pub fn add_issue_dependency(
1702 &mut self,
1703 issue_id: &str,
1704 depends_on_id: &str,
1705 dependency_type: &str,
1706 actor: &str,
1707 ) -> Result<()> {
1708 self.mutate("add_issue_dependency", actor, |tx, ctx| {
1709 let full_issue_id: String = tx
1711 .query_row(
1712 "SELECT id FROM issues WHERE id = ?1 OR short_id = ?1",
1713 [issue_id],
1714 |row| row.get(0),
1715 )
1716 .optional()?
1717 .ok_or_else(|| Error::IssueNotFound {
1718 id: issue_id.to_string(),
1719 })?;
1720
1721 let full_depends_on_id: String = tx
1722 .query_row(
1723 "SELECT id FROM issues WHERE id = ?1 OR short_id = ?1",
1724 [depends_on_id],
1725 |row| row.get(0),
1726 )
1727 .optional()?
1728 .ok_or_else(|| Error::IssueNotFound {
1729 id: depends_on_id.to_string(),
1730 })?;
1731
1732 let dep_id = format!("dep_{}", &uuid::Uuid::new_v4().to_string()[..12]);
1733 let now = chrono::Utc::now().timestamp_millis();
1734
1735 tx.execute(
1736 "INSERT OR IGNORE INTO issue_dependencies (id, issue_id, depends_on_id, dependency_type, created_at)
1737 VALUES (?1, ?2, ?3, ?4, ?5)",
1738 rusqlite::params![dep_id, full_issue_id, full_depends_on_id, dependency_type, now],
1739 )?;
1740
1741 ctx.record_event("issue", &full_issue_id, EventType::IssueUpdated);
1742 Ok(())
1743 })
1744 }
1745
1746 pub fn remove_issue_dependency(
1752 &mut self,
1753 issue_id: &str,
1754 depends_on_id: &str,
1755 actor: &str,
1756 ) -> Result<()> {
1757 self.mutate("remove_issue_dependency", actor, |tx, ctx| {
1758 let full_issue_id: String = tx
1760 .query_row(
1761 "SELECT id FROM issues WHERE id = ?1 OR short_id = ?1",
1762 [issue_id],
1763 |row| row.get(0),
1764 )
1765 .optional()?
1766 .ok_or_else(|| Error::IssueNotFound {
1767 id: issue_id.to_string(),
1768 })?;
1769
1770 let full_depends_on_id: String = tx
1771 .query_row(
1772 "SELECT id FROM issues WHERE id = ?1 OR short_id = ?1",
1773 [depends_on_id],
1774 |row| row.get(0),
1775 )
1776 .optional()?
1777 .ok_or_else(|| Error::IssueNotFound {
1778 id: depends_on_id.to_string(),
1779 })?;
1780
1781 tx.execute(
1782 "DELETE FROM issue_dependencies WHERE issue_id = ?1 AND depends_on_id = ?2",
1783 rusqlite::params![full_issue_id, full_depends_on_id],
1784 )?;
1785
1786 ctx.record_event("issue", &full_issue_id, EventType::IssueUpdated);
1787 Ok(())
1788 })
1789 }
1790
1791 pub fn clone_issue(
1797 &mut self,
1798 id: &str,
1799 new_title: Option<&str>,
1800 actor: &str,
1801 ) -> Result<Issue> {
1802 let source = self
1804 .get_issue(id, None)?
1805 .ok_or_else(|| Error::IssueNotFound { id: id.to_string() })?;
1806
1807 let new_id = format!("issue_{}", &uuid::Uuid::new_v4().to_string()[..12]);
1808 let new_short_id = generate_short_id();
1809 let default_title = format!("Copy of {}", source.title);
1810 let title = new_title.unwrap_or(&default_title);
1811 let now = chrono::Utc::now().timestamp_millis();
1812
1813 self.mutate("clone_issue", actor, |tx, ctx| {
1814 tx.execute(
1815 "INSERT INTO issues (id, short_id, project_path, title, description, details, status, priority, issue_type, plan_id, created_by_agent, created_at, updated_at)
1816 VALUES (?1, ?2, ?3, ?4, ?5, ?6, 'open', ?7, ?8, ?9, ?10, ?11, ?11)",
1817 rusqlite::params![
1818 new_id,
1819 new_short_id,
1820 source.project_path,
1821 title,
1822 source.description,
1823 source.details,
1824 source.priority,
1825 source.issue_type,
1826 source.plan_id,
1827 ctx.actor,
1828 now
1829 ],
1830 )?;
1831
1832 let labels: Vec<String> = tx
1834 .prepare("SELECT label FROM issue_labels WHERE issue_id = ?1")?
1835 .query_map([&source.id], |row| row.get(0))?
1836 .collect::<std::result::Result<Vec<String>, _>>()?;
1837
1838 for label in &labels {
1839 let label_id = format!("label_{}", &uuid::Uuid::new_v4().to_string()[..12]);
1840 tx.execute(
1841 "INSERT INTO issue_labels (id, issue_id, label) VALUES (?1, ?2, ?3)",
1842 rusqlite::params![label_id, new_id, label],
1843 )?;
1844 }
1845
1846 ctx.record_event("issue", &new_id, EventType::IssueCreated);
1847 Ok(())
1848 })?;
1849
1850 self.get_issue(&new_id, None)?
1852 .ok_or_else(|| Error::Other("Failed to retrieve cloned issue".to_string()))
1853 }
1854
1855 pub fn mark_issue_duplicate(
1861 &mut self,
1862 id: &str,
1863 duplicate_of_id: &str,
1864 actor: &str,
1865 ) -> Result<()> {
1866 self.add_issue_dependency(id, duplicate_of_id, "duplicate-of", actor)?;
1868
1869 self.update_issue_status(id, "closed", actor)?;
1871
1872 Ok(())
1873 }
1874
1875 pub fn get_ready_issues(&self, project_path: &str, limit: u32) -> Result<Vec<Issue>> {
1881 let mut stmt = self.conn.prepare(
1882 "SELECT i.id, i.short_id, i.project_path, i.title, i.description, i.details,
1883 i.status, i.priority, i.issue_type, i.plan_id, i.created_by_agent,
1884 i.assigned_to_agent, i.created_at, i.updated_at, i.closed_at
1885 FROM issues i
1886 WHERE i.project_path = ?1
1887 AND i.status = 'open'
1888 AND i.assigned_to_agent IS NULL
1889 AND NOT EXISTS (
1890 SELECT 1 FROM issue_dependencies d
1891 JOIN issues dep ON dep.id = d.depends_on_id
1892 WHERE d.issue_id = i.id
1893 AND d.dependency_type = 'blocks'
1894 AND dep.status != 'closed'
1895 )
1896 ORDER BY i.priority DESC, i.created_at ASC
1897 LIMIT ?2",
1898 )?;
1899
1900 let issues = stmt
1901 .query_map(rusqlite::params![project_path, limit], |row| {
1902 Ok(Issue {
1903 id: row.get(0)?,
1904 short_id: row.get(1)?,
1905 project_path: row.get(2)?,
1906 title: row.get(3)?,
1907 description: row.get(4)?,
1908 details: row.get(5)?,
1909 status: row.get(6)?,
1910 priority: row.get(7)?,
1911 issue_type: row.get(8)?,
1912 plan_id: row.get(9)?,
1913 created_by_agent: row.get(10)?,
1914 assigned_to_agent: row.get(11)?,
1915 created_at: row.get(12)?,
1916 updated_at: row.get(13)?,
1917 closed_at: row.get(14)?,
1918 })
1919 })?
1920 .collect::<std::result::Result<Vec<_>, _>>()?;
1921
1922 Ok(issues)
1923 }
1924
1925 pub fn get_next_issue_block(
1931 &mut self,
1932 project_path: &str,
1933 count: u32,
1934 actor: &str,
1935 ) -> Result<Vec<Issue>> {
1936 let ready = self.get_ready_issues(project_path, count)?;
1937
1938 for issue in &ready {
1939 self.claim_issue(&issue.id, actor)?;
1940 }
1941
1942 let claimed: Vec<Issue> = ready
1944 .iter()
1945 .filter_map(|i| self.get_issue(&i.id, None).ok().flatten())
1946 .collect();
1947
1948 Ok(claimed)
1949 }
1950
1951 pub fn count_issues_grouped(
1957 &self,
1958 project_path: &str,
1959 group_by: &str,
1960 ) -> Result<Vec<(String, i64)>> {
1961 let column = match group_by {
1962 "status" => "status",
1963 "type" => "issue_type",
1964 "priority" => "CAST(priority AS TEXT)",
1965 "assignee" => "COALESCE(assigned_to_agent, 'unassigned')",
1966 _ => return Err(Error::InvalidArgument(
1967 format!("Invalid group_by '{group_by}'. Valid: status, type, priority, assignee")
1968 )),
1969 };
1970
1971 let sql = format!(
1972 "SELECT {column}, COUNT(*) as count FROM issues \
1973 WHERE project_path = ?1 GROUP BY {column} ORDER BY count DESC"
1974 );
1975
1976 let mut stmt = self.conn.prepare(&sql)?;
1977 let rows = stmt.query_map([project_path], |row| {
1978 Ok((row.get::<_, String>(0)?, row.get::<_, i64>(1)?))
1979 })?;
1980
1981 Ok(rows.collect::<std::result::Result<Vec<_>, _>>()?)
1982 }
1983
1984 pub fn get_stale_issues(
1986 &self,
1987 project_path: &str,
1988 stale_days: u64,
1989 limit: u32,
1990 ) -> Result<Vec<Issue>> {
1991 let cutoff_ms = chrono::Utc::now().timestamp_millis()
1992 - (stale_days as i64 * 24 * 60 * 60 * 1000);
1993
1994 let mut stmt = self.conn.prepare(
1995 "SELECT id, short_id, project_path, title, description, details,
1996 status, priority, issue_type, plan_id, created_by_agent,
1997 assigned_to_agent, created_at, updated_at, closed_at
1998 FROM issues
1999 WHERE project_path = ?1
2000 AND status IN ('open', 'in_progress', 'blocked')
2001 AND updated_at < ?2
2002 ORDER BY updated_at ASC
2003 LIMIT ?3",
2004 )?;
2005
2006 let issues = stmt
2007 .query_map(rusqlite::params![project_path, cutoff_ms, limit], map_issue_row)?
2008 .collect::<std::result::Result<Vec<_>, _>>()?;
2009
2010 Ok(issues)
2011 }
2012
2013 pub fn get_blocked_issues(
2015 &self,
2016 project_path: &str,
2017 limit: u32,
2018 ) -> Result<Vec<(Issue, Vec<Issue>)>> {
2019 let mut stmt = self.conn.prepare(
2020 "SELECT i.id, i.short_id, i.project_path, i.title, i.description, i.details,
2021 i.status, i.priority, i.issue_type, i.plan_id, i.created_by_agent,
2022 i.assigned_to_agent, i.created_at, i.updated_at, i.closed_at
2023 FROM issues i
2024 WHERE i.project_path = ?1
2025 AND i.status NOT IN ('closed', 'deferred')
2026 AND EXISTS (
2027 SELECT 1 FROM issue_dependencies d
2028 JOIN issues dep ON dep.id = d.depends_on_id
2029 WHERE d.issue_id = i.id
2030 AND d.dependency_type = 'blocks'
2031 AND dep.status != 'closed'
2032 )
2033 ORDER BY i.priority DESC, i.created_at ASC
2034 LIMIT ?2",
2035 )?;
2036
2037 let blocked_issues = stmt
2038 .query_map(rusqlite::params![project_path, limit], map_issue_row)?
2039 .collect::<std::result::Result<Vec<_>, _>>()?;
2040
2041 let mut blocker_stmt = self.conn.prepare(
2042 "SELECT dep.id, dep.short_id, dep.project_path, dep.title, dep.description, dep.details,
2043 dep.status, dep.priority, dep.issue_type, dep.plan_id, dep.created_by_agent,
2044 dep.assigned_to_agent, dep.created_at, dep.updated_at, dep.closed_at
2045 FROM issue_dependencies d
2046 JOIN issues dep ON dep.id = d.depends_on_id
2047 WHERE d.issue_id = ?1
2048 AND d.dependency_type = 'blocks'
2049 AND dep.status != 'closed'",
2050 )?;
2051
2052 let mut results = Vec::with_capacity(blocked_issues.len());
2053 for issue in blocked_issues {
2054 let blockers = blocker_stmt
2055 .query_map([&issue.id], map_issue_row)?
2056 .collect::<std::result::Result<Vec<_>, _>>()?;
2057 results.push((issue, blockers));
2058 }
2059
2060 Ok(results)
2061 }
2062
2063 pub fn get_epic_progress(&self, epic_id: &str) -> Result<EpicProgress> {
2065 let mut stmt = self.conn.prepare(
2066 "SELECT child.status, COUNT(*) as count
2067 FROM issue_dependencies d
2068 JOIN issues child ON child.id = d.issue_id
2069 WHERE d.depends_on_id = ?1
2070 AND d.dependency_type = 'parent-child'
2071 GROUP BY child.status",
2072 )?;
2073
2074 let rows = stmt
2075 .query_map([epic_id], |row| {
2076 Ok((row.get::<_, String>(0)?, row.get::<_, usize>(1)?))
2077 })?
2078 .collect::<std::result::Result<Vec<_>, _>>()?;
2079
2080 let mut progress = EpicProgress::default();
2081 for (status, count) in rows {
2082 match status.as_str() {
2083 "closed" => progress.closed += count,
2084 "in_progress" => progress.in_progress += count,
2085 "open" => progress.open += count,
2086 "blocked" => progress.blocked += count,
2087 "deferred" => progress.deferred += count,
2088 _ => progress.open += count,
2089 }
2090 progress.total += count;
2091 }
2092
2093 Ok(progress)
2094 }
2095
2096 pub fn get_dependency_tree(&self, root_id: &str) -> Result<Vec<(Issue, i32)>> {
2099 let root = self.get_issue(root_id, None)?
2101 .ok_or_else(|| Error::IssueNotFound { id: root_id.to_string() })?;
2102
2103 let root_full_id = root.id.clone();
2104 let mut result = vec![(root, 0)];
2105 let mut queue = vec![(root_full_id.clone(), 0i32)];
2106 let mut visited = std::collections::HashSet::new();
2107 visited.insert(root_full_id);
2108
2109 let mut child_stmt = self.conn.prepare(
2110 "SELECT child.id, child.short_id, child.project_path, child.title,
2111 child.description, child.details, child.status, child.priority,
2112 child.issue_type, child.plan_id, child.created_by_agent,
2113 child.assigned_to_agent, child.created_at, child.updated_at,
2114 child.closed_at
2115 FROM issue_dependencies d
2116 JOIN issues child ON child.id = d.issue_id
2117 WHERE d.depends_on_id = ?1
2118 AND d.dependency_type IN ('parent-child', 'blocks')
2119 ORDER BY child.priority DESC, child.created_at ASC",
2120 )?;
2121
2122 while let Some((parent_id, depth)) = queue.pop() {
2123 let children = child_stmt
2124 .query_map([&parent_id], map_issue_row)?
2125 .collect::<std::result::Result<Vec<_>, _>>()?;
2126
2127 for child in children {
2128 if visited.insert(child.id.clone()) {
2129 let child_id = child.id.clone();
2130 result.push((child, depth + 1));
2131 queue.push((child_id, depth + 1));
2132 }
2133 }
2134 }
2135
2136 Ok(result)
2137 }
2138
2139 pub fn get_epics(&self, project_path: &str) -> Result<Vec<Issue>> {
2141 let mut stmt = self.conn.prepare(
2142 "SELECT id, short_id, project_path, title, description, details,
2143 status, priority, issue_type, plan_id, created_by_agent,
2144 assigned_to_agent, created_at, updated_at, closed_at
2145 FROM issues
2146 WHERE project_path = ?1
2147 AND issue_type = 'epic'
2148 AND status != 'closed'
2149 ORDER BY priority DESC, created_at ASC",
2150 )?;
2151
2152 let issues = stmt
2153 .query_map([project_path], map_issue_row)?
2154 .collect::<std::result::Result<Vec<_>, _>>()?;
2155
2156 Ok(issues)
2157 }
2158
2159 pub fn set_close_reason(
2161 &mut self,
2162 id: &str,
2163 reason: &str,
2164 actor: &str,
2165 ) -> Result<()> {
2166 let now = chrono::Utc::now().timestamp_millis();
2167 self.mutate("set_close_reason", actor, |tx, _ctx| {
2168 let rows = tx.execute(
2169 "UPDATE issues SET close_reason = ?1, updated_at = ?2 WHERE id = ?3 OR short_id = ?3",
2170 rusqlite::params![reason, now, id],
2171 )?;
2172 if rows == 0 {
2173 return Err(Error::IssueNotFound { id: id.to_string() });
2174 }
2175 Ok(())
2176 })
2177 }
2178
2179 pub fn get_close_reason(&self, id: &str) -> Result<Option<String>> {
2181 let result = self.conn.query_row(
2182 "SELECT close_reason FROM issues WHERE id = ?1 OR short_id = ?1",
2183 [id],
2184 |row| row.get(0),
2185 );
2186 match result {
2187 Ok(reason) => Ok(reason),
2188 Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
2189 Err(e) => Err(e.into()),
2190 }
2191 }
2192
2193 #[allow(clippy::too_many_arguments)]
2203 pub fn create_checkpoint(
2204 &mut self,
2205 id: &str,
2206 session_id: &str,
2207 name: &str,
2208 description: Option<&str>,
2209 git_status: Option<&str>,
2210 git_branch: Option<&str>,
2211 actor: &str,
2212 ) -> Result<()> {
2213 let now = chrono::Utc::now().timestamp_millis();
2214
2215 self.mutate("create_checkpoint", actor, |tx, ctx| {
2216 tx.execute(
2217 "INSERT INTO checkpoints (id, session_id, name, description, git_status, git_branch, created_at)
2218 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)",
2219 rusqlite::params![id, session_id, name, description, git_status, git_branch, now],
2220 )?;
2221
2222 ctx.record_event("checkpoint", id, EventType::CheckpointCreated);
2223
2224 Ok(())
2225 })
2226 }
2227
2228 pub fn add_checkpoint_item(
2234 &mut self,
2235 checkpoint_id: &str,
2236 context_item_id: &str,
2237 actor: &str,
2238 ) -> Result<()> {
2239 let id = format!("cpitem_{}", &uuid::Uuid::new_v4().to_string()[..12]);
2240 self.mutate("add_checkpoint_item", actor, |tx, _ctx| {
2241 tx.execute(
2242 "INSERT OR IGNORE INTO checkpoint_items (id, checkpoint_id, context_item_id)
2243 VALUES (?1, ?2, ?3)",
2244 rusqlite::params![id, checkpoint_id, context_item_id],
2245 )?;
2246
2247 Ok(())
2248 })
2249 }
2250
2251 pub fn count_items_since_last_checkpoint(&self, session_id: &str) -> Result<i64> {
2259 let last_checkpoint_time: Option<i64> = self.conn.query_row(
2260 "SELECT MAX(created_at) FROM checkpoints WHERE session_id = ?1",
2261 [session_id],
2262 |row| row.get(0),
2263 )?;
2264
2265 let count = if let Some(ts) = last_checkpoint_time {
2266 self.conn.query_row(
2267 "SELECT COUNT(*) FROM context_items WHERE session_id = ?1 AND created_at > ?2",
2268 rusqlite::params![session_id, ts],
2269 |row| row.get(0),
2270 )?
2271 } else {
2272 self.conn.query_row(
2274 "SELECT COUNT(*) FROM context_items WHERE session_id = ?1",
2275 [session_id],
2276 |row| row.get(0),
2277 )?
2278 };
2279
2280 Ok(count)
2281 }
2282
2283 pub fn list_checkpoints(
2289 &self,
2290 session_id: &str,
2291 limit: Option<u32>,
2292 ) -> Result<Vec<Checkpoint>> {
2293 let limit = limit.unwrap_or(20);
2294
2295 let mut stmt = self.conn.prepare(
2296 "SELECT c.id, c.session_id, c.name, c.description, c.git_status, c.git_branch, c.created_at,
2297 (SELECT COUNT(*) FROM checkpoint_items ci WHERE ci.checkpoint_id = c.id) as item_count
2298 FROM checkpoints c
2299 WHERE c.session_id = ?1
2300 ORDER BY c.created_at DESC
2301 LIMIT ?2",
2302 )?;
2303
2304 let rows = stmt.query_map(rusqlite::params![session_id, limit], |row| {
2305 Ok(Checkpoint {
2306 id: row.get(0)?,
2307 session_id: row.get(1)?,
2308 name: row.get(2)?,
2309 description: row.get(3)?,
2310 git_status: row.get(4)?,
2311 git_branch: row.get(5)?,
2312 created_at: row.get(6)?,
2313 item_count: row.get(7)?,
2314 })
2315 })?;
2316
2317 rows.collect::<std::result::Result<Vec<_>, _>>()
2318 .map_err(Error::from)
2319 }
2320
2321 pub fn get_checkpoint(&self, id: &str) -> Result<Option<Checkpoint>> {
2327 let mut stmt = self.conn.prepare(
2328 "SELECT c.id, c.session_id, c.name, c.description, c.git_status, c.git_branch, c.created_at,
2329 (SELECT COUNT(*) FROM checkpoint_items ci WHERE ci.checkpoint_id = c.id) as item_count
2330 FROM checkpoints c
2331 WHERE c.id = ?1",
2332 )?;
2333
2334 let checkpoint = stmt
2335 .query_row([id], |row| {
2336 Ok(Checkpoint {
2337 id: row.get(0)?,
2338 session_id: row.get(1)?,
2339 name: row.get(2)?,
2340 description: row.get(3)?,
2341 git_status: row.get(4)?,
2342 git_branch: row.get(5)?,
2343 created_at: row.get(6)?,
2344 item_count: row.get(7)?,
2345 })
2346 })
2347 .optional()?;
2348
2349 Ok(checkpoint)
2350 }
2351
2352 pub fn delete_checkpoint(&mut self, id: &str, actor: &str) -> Result<()> {
2358 self.mutate("delete_checkpoint", actor, |tx, ctx| {
2359 let project_path: Option<Option<String>> = tx
2361 .query_row(
2362 "SELECT s.project_path FROM checkpoints c
2363 JOIN sessions s ON c.session_id = s.id
2364 WHERE c.id = ?1",
2365 [id],
2366 |row| row.get(0),
2367 )
2368 .optional()?;
2369
2370 tx.execute("DELETE FROM checkpoint_items WHERE checkpoint_id = ?1", [id])?;
2372
2373 let rows = tx.execute("DELETE FROM checkpoints WHERE id = ?1", [id])?;
2375
2376 if rows == 0 {
2377 return Err(Error::CheckpointNotFound { id: id.to_string() });
2378 }
2379
2380 ctx.record_event("checkpoint", id, EventType::CheckpointDeleted);
2381
2382 if let Some(Some(path)) = project_path {
2384 let now = chrono::Utc::now().timestamp_millis();
2385 tx.execute(
2386 "INSERT INTO sync_deletions (entity_type, entity_id, project_path, deleted_at, deleted_by, exported)
2387 VALUES ('checkpoint', ?1, ?2, ?3, ?4, 0)
2388 ON CONFLICT(entity_type, entity_id) DO UPDATE SET
2389 deleted_at = excluded.deleted_at,
2390 deleted_by = excluded.deleted_by,
2391 exported = 0",
2392 rusqlite::params![id, path, now, ctx.actor],
2393 )?;
2394 }
2395
2396 Ok(())
2397 })
2398 }
2399
2400 pub fn get_checkpoint_items(&self, checkpoint_id: &str) -> Result<Vec<ContextItem>> {
2406 let mut stmt = self.conn.prepare(
2407 "SELECT ci.id, ci.session_id, ci.key, ci.value, ci.category, ci.priority,
2408 ci.channel, ci.tags, ci.size, ci.created_at, ci.updated_at
2409 FROM context_items ci
2410 JOIN checkpoint_items cpi ON cpi.context_item_id = ci.id
2411 WHERE cpi.checkpoint_id = ?1
2412 ORDER BY ci.priority DESC, ci.created_at DESC",
2413 )?;
2414
2415 let rows = stmt.query_map([checkpoint_id], |row| {
2416 Ok(ContextItem {
2417 id: row.get(0)?,
2418 session_id: row.get(1)?,
2419 key: row.get(2)?,
2420 value: row.get(3)?,
2421 category: row.get(4)?,
2422 priority: row.get(5)?,
2423 channel: row.get(6)?,
2424 tags: row.get(7)?,
2425 size: row.get(8)?,
2426 created_at: row.get(9)?,
2427 updated_at: row.get(10)?,
2428 })
2429 })?;
2430
2431 rows.collect::<std::result::Result<Vec<_>, _>>()
2432 .map_err(Error::from)
2433 }
2434
2435 pub fn restore_checkpoint(
2444 &mut self,
2445 checkpoint_id: &str,
2446 target_session_id: &str,
2447 restore_categories: Option<&[String]>,
2448 restore_tags: Option<&[String]>,
2449 actor: &str,
2450 ) -> Result<usize> {
2451 let mut items = self.get_checkpoint_items(checkpoint_id)?;
2453
2454 if let Some(categories) = restore_categories {
2456 items.retain(|item| categories.contains(&item.category));
2457 }
2458
2459 if let Some(tags) = restore_tags {
2461 items.retain(|item| {
2462 if let Some(ref item_tags) = item.tags {
2464 if let Ok(parsed_tags) = serde_json::from_str::<Vec<String>>(item_tags) {
2465 return tags.iter().any(|t| parsed_tags.contains(t));
2466 }
2467 }
2468 false
2469 });
2470 }
2471
2472 let now = chrono::Utc::now().timestamp_millis();
2473
2474 self.mutate("restore_checkpoint", actor, |tx, ctx| {
2475 tx.execute(
2477 "DELETE FROM context_items WHERE session_id = ?1",
2478 [target_session_id],
2479 )?;
2480
2481 let mut restored = 0;
2483 for item in &items {
2484 let new_id = uuid::Uuid::new_v4().to_string();
2485 let size = item.value.len() as i64;
2486
2487 tx.execute(
2488 "INSERT INTO context_items (id, session_id, key, value, category, priority, channel, tags, size, created_at, updated_at)
2489 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?10)",
2490 rusqlite::params![
2491 new_id,
2492 target_session_id,
2493 item.key,
2494 item.value,
2495 item.category,
2496 item.priority,
2497 item.channel,
2498 item.tags,
2499 size,
2500 now,
2501 ],
2502 )?;
2503
2504 ctx.record_event("context_item", &new_id, EventType::ItemCreated);
2505 restored += 1;
2506 }
2507
2508 Ok(restored)
2509 })
2510 }
2511
2512 pub fn remove_checkpoint_item(
2518 &mut self,
2519 checkpoint_id: &str,
2520 context_item_id: &str,
2521 actor: &str,
2522 ) -> Result<()> {
2523 self.mutate("remove_checkpoint_item", actor, |tx, _ctx| {
2524 tx.execute(
2525 "DELETE FROM checkpoint_items WHERE checkpoint_id = ?1 AND context_item_id = ?2",
2526 rusqlite::params![checkpoint_id, context_item_id],
2527 )?;
2528 Ok(())
2529 })
2530 }
2531
2532 pub fn add_checkpoint_items_by_keys(
2538 &mut self,
2539 checkpoint_id: &str,
2540 session_id: &str,
2541 keys: &[String],
2542 actor: &str,
2543 ) -> Result<usize> {
2544 let mut added = 0;
2545
2546 for key in keys {
2547 let item_id: Option<String> = self.conn.query_row(
2549 "SELECT id FROM context_items WHERE session_id = ?1 AND key = ?2",
2550 rusqlite::params![session_id, key],
2551 |row| row.get(0),
2552 ).optional()?;
2553
2554 if let Some(id) = item_id {
2555 self.add_checkpoint_item(checkpoint_id, &id, actor)?;
2556 added += 1;
2557 }
2558 }
2559
2560 Ok(added)
2561 }
2562
2563 pub fn remove_checkpoint_items_by_keys(
2569 &mut self,
2570 checkpoint_id: &str,
2571 keys: &[String],
2572 actor: &str,
2573 ) -> Result<usize> {
2574 let mut removed = 0;
2575
2576 for key in keys {
2577 let item_id: Option<String> = self.conn.query_row(
2579 "SELECT ci.id FROM context_items ci
2580 JOIN checkpoint_items cpi ON cpi.context_item_id = ci.id
2581 WHERE cpi.checkpoint_id = ?1 AND ci.key = ?2",
2582 rusqlite::params![checkpoint_id, key],
2583 |row| row.get(0),
2584 ).optional()?;
2585
2586 if let Some(id) = item_id {
2587 self.remove_checkpoint_item(checkpoint_id, &id, actor)?;
2588 removed += 1;
2589 }
2590 }
2591
2592 Ok(removed)
2593 }
2594
2595 #[allow(clippy::too_many_arguments)]
2605 pub fn save_memory(
2606 &mut self,
2607 id: &str,
2608 project_path: &str,
2609 key: &str,
2610 value: &str,
2611 category: &str,
2612 actor: &str,
2613 ) -> Result<()> {
2614 let now = chrono::Utc::now().timestamp_millis();
2615
2616 self.mutate("save_memory", actor, |tx, ctx| {
2617 tx.execute(
2618 "INSERT INTO project_memory (id, project_path, key, value, category, created_at, updated_at)
2619 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?6)
2620 ON CONFLICT(project_path, key) DO UPDATE SET
2621 value = excluded.value,
2622 category = excluded.category,
2623 updated_at = excluded.updated_at",
2624 rusqlite::params![id, project_path, key, value, category, now],
2625 )?;
2626
2627 ctx.record_event("memory", id, EventType::MemorySaved);
2628
2629 Ok(())
2630 })
2631 }
2632
2633 pub fn get_memory(&self, project_path: &str, key: &str) -> Result<Option<Memory>> {
2639 let mut stmt = self.conn.prepare(
2640 "SELECT id, project_path, key, value, category, created_at, updated_at
2641 FROM project_memory WHERE project_path = ?1 AND key = ?2",
2642 )?;
2643
2644 let memory = stmt
2645 .query_row(rusqlite::params![project_path, key], |row| {
2646 Ok(Memory {
2647 id: row.get(0)?,
2648 project_path: row.get(1)?,
2649 key: row.get(2)?,
2650 value: row.get(3)?,
2651 category: row.get(4)?,
2652 created_at: row.get(5)?,
2653 updated_at: row.get(6)?,
2654 })
2655 })
2656 .optional()?;
2657
2658 Ok(memory)
2659 }
2660
2661 pub fn list_memory(
2667 &self,
2668 project_path: &str,
2669 category: Option<&str>,
2670 ) -> Result<Vec<Memory>> {
2671 let map_row = |row: &rusqlite::Row| -> rusqlite::Result<Memory> {
2672 Ok(Memory {
2673 id: row.get(0)?,
2674 project_path: row.get(1)?,
2675 key: row.get(2)?,
2676 value: row.get(3)?,
2677 category: row.get(4)?,
2678 created_at: row.get(5)?,
2679 updated_at: row.get(6)?,
2680 })
2681 };
2682
2683 let rows = if let Some(cat) = category {
2684 let mut stmt = self.conn.prepare(
2685 "SELECT id, project_path, key, value, category, created_at, updated_at
2686 FROM project_memory WHERE project_path = ?1 AND category = ?2
2687 ORDER BY key ASC",
2688 )?;
2689 stmt.query_map(rusqlite::params![project_path, cat], map_row)?
2690 .collect::<std::result::Result<Vec<_>, _>>()
2691 } else {
2692 let mut stmt = self.conn.prepare(
2693 "SELECT id, project_path, key, value, category, created_at, updated_at
2694 FROM project_memory WHERE project_path = ?1
2695 ORDER BY key ASC",
2696 )?;
2697 stmt.query_map(rusqlite::params![project_path], map_row)?
2698 .collect::<std::result::Result<Vec<_>, _>>()
2699 };
2700
2701 rows.map_err(Error::from)
2702 }
2703
2704 pub fn delete_memory(
2710 &mut self,
2711 project_path: &str,
2712 key: &str,
2713 actor: &str,
2714 ) -> Result<()> {
2715 let proj_path = project_path.to_string();
2716 self.mutate("delete_memory", actor, |tx, ctx| {
2717 let id: Option<String> = tx
2719 .query_row(
2720 "SELECT id FROM project_memory WHERE project_path = ?1 AND key = ?2",
2721 rusqlite::params![proj_path, key],
2722 |row| row.get(0),
2723 )
2724 .optional()?;
2725
2726 let rows = tx.execute(
2727 "DELETE FROM project_memory WHERE project_path = ?1 AND key = ?2",
2728 rusqlite::params![proj_path, key],
2729 )?;
2730
2731 if rows > 0 {
2732 if let Some(ref mem_id) = id {
2733 ctx.record_event("memory", mem_id, EventType::MemoryDeleted);
2734
2735 let now = chrono::Utc::now().timestamp_millis();
2737 tx.execute(
2738 "INSERT INTO sync_deletions (entity_type, entity_id, project_path, deleted_at, deleted_by, exported)
2739 VALUES ('memory', ?1, ?2, ?3, ?4, 0)
2740 ON CONFLICT(entity_type, entity_id) DO UPDATE SET
2741 deleted_at = excluded.deleted_at,
2742 deleted_by = excluded.deleted_by,
2743 exported = 0",
2744 rusqlite::params![mem_id, proj_path, now, ctx.actor],
2745 )?;
2746 }
2747 }
2748
2749 Ok(())
2750 })
2751 }
2752
2753 pub fn get_dirty_sessions(&self) -> Result<Vec<String>> {
2763 let mut stmt = self.conn.prepare(
2764 "SELECT session_id FROM dirty_sessions ORDER BY marked_at ASC",
2765 )?;
2766 let rows = stmt.query_map([], |row| row.get(0))?;
2767 rows.collect::<std::result::Result<Vec<_>, _>>()
2768 .map_err(Error::from)
2769 }
2770
2771 pub fn get_dirty_issues(&self) -> Result<Vec<String>> {
2777 let mut stmt = self.conn.prepare(
2778 "SELECT issue_id FROM dirty_issues ORDER BY marked_at ASC",
2779 )?;
2780 let rows = stmt.query_map([], |row| row.get(0))?;
2781 rows.collect::<std::result::Result<Vec<_>, _>>()
2782 .map_err(Error::from)
2783 }
2784
2785 pub fn get_dirty_context_items(&self) -> Result<Vec<String>> {
2791 let mut stmt = self.conn.prepare(
2792 "SELECT item_id FROM dirty_context_items ORDER BY marked_at ASC",
2793 )?;
2794 let rows = stmt.query_map([], |row| row.get(0))?;
2795 rows.collect::<std::result::Result<Vec<_>, _>>()
2796 .map_err(Error::from)
2797 }
2798
2799 pub fn clear_dirty_sessions(&mut self, ids: &[String]) -> Result<()> {
2805 if ids.is_empty() {
2806 return Ok(());
2807 }
2808 let placeholders = vec!["?"; ids.len()].join(",");
2809 let sql = format!("DELETE FROM dirty_sessions WHERE session_id IN ({placeholders})");
2810 let params: Vec<&dyn rusqlite::ToSql> = ids.iter().map(|s| s as &dyn rusqlite::ToSql).collect();
2811 self.conn.execute(&sql, params.as_slice())?;
2812 Ok(())
2813 }
2814
2815 pub fn clear_dirty_issues(&mut self, ids: &[String]) -> Result<()> {
2821 if ids.is_empty() {
2822 return Ok(());
2823 }
2824 let placeholders = vec!["?"; ids.len()].join(",");
2825 let sql = format!("DELETE FROM dirty_issues WHERE issue_id IN ({placeholders})");
2826 let params: Vec<&dyn rusqlite::ToSql> = ids.iter().map(|s| s as &dyn rusqlite::ToSql).collect();
2827 self.conn.execute(&sql, params.as_slice())?;
2828 Ok(())
2829 }
2830
2831 pub fn clear_dirty_context_items(&mut self, ids: &[String]) -> Result<()> {
2837 if ids.is_empty() {
2838 return Ok(());
2839 }
2840 let placeholders = vec!["?"; ids.len()].join(",");
2841 let sql = format!("DELETE FROM dirty_context_items WHERE item_id IN ({placeholders})");
2842 let params: Vec<&dyn rusqlite::ToSql> = ids.iter().map(|s| s as &dyn rusqlite::ToSql).collect();
2843 self.conn.execute(&sql, params.as_slice())?;
2844 Ok(())
2845 }
2846
2847 pub fn get_export_hash(&self, entity_type: &str, entity_id: &str) -> Result<Option<String>> {
2853 let mut stmt = self.conn.prepare(
2854 "SELECT content_hash FROM export_hashes WHERE entity_type = ?1 AND entity_id = ?2",
2855 )?;
2856 let hash = stmt
2857 .query_row(rusqlite::params![entity_type, entity_id], |row| row.get(0))
2858 .optional()?;
2859 Ok(hash)
2860 }
2861
2862 pub fn set_export_hash(&mut self, entity_type: &str, entity_id: &str, hash: &str) -> Result<()> {
2868 let now = chrono::Utc::now().timestamp_millis();
2869 self.conn.execute(
2870 "INSERT INTO export_hashes (entity_type, entity_id, content_hash, exported_at)
2871 VALUES (?1, ?2, ?3, ?4)
2872 ON CONFLICT(entity_type, entity_id) DO UPDATE SET
2873 content_hash = excluded.content_hash,
2874 exported_at = excluded.exported_at",
2875 rusqlite::params![entity_type, entity_id, hash, now],
2876 )?;
2877 Ok(())
2878 }
2879
2880 pub fn record_deletion(
2893 &mut self,
2894 entity_type: &str,
2895 entity_id: &str,
2896 project_path: &str,
2897 actor: &str,
2898 ) -> Result<()> {
2899 let now = chrono::Utc::now().timestamp_millis();
2900 self.conn.execute(
2901 "INSERT INTO sync_deletions (entity_type, entity_id, project_path, deleted_at, deleted_by, exported)
2902 VALUES (?1, ?2, ?3, ?4, ?5, 0)
2903 ON CONFLICT(entity_type, entity_id) DO UPDATE SET
2904 deleted_at = excluded.deleted_at,
2905 deleted_by = excluded.deleted_by,
2906 exported = 0",
2907 rusqlite::params![entity_type, entity_id, project_path, now, actor],
2908 )?;
2909 Ok(())
2910 }
2911
2912 pub fn get_pending_deletions(&self, project_path: &str) -> Result<Vec<SyncDeletion>> {
2918 let mut stmt = self.conn.prepare(
2919 "SELECT id, entity_type, entity_id, project_path, deleted_at, deleted_by
2920 FROM sync_deletions
2921 WHERE project_path = ?1 AND exported = 0
2922 ORDER BY deleted_at ASC",
2923 )?;
2924 let rows = stmt.query_map([project_path], |row| {
2925 Ok(SyncDeletion {
2926 id: row.get(0)?,
2927 entity_type: row.get(1)?,
2928 entity_id: row.get(2)?,
2929 project_path: row.get(3)?,
2930 deleted_at: row.get(4)?,
2931 deleted_by: row.get(5)?,
2932 })
2933 })?;
2934 rows.collect::<std::result::Result<Vec<_>, _>>()
2935 .map_err(Error::from)
2936 }
2937
2938 pub fn get_all_deletions(&self, project_path: &str) -> Result<Vec<SyncDeletion>> {
2944 let mut stmt = self.conn.prepare(
2945 "SELECT id, entity_type, entity_id, project_path, deleted_at, deleted_by
2946 FROM sync_deletions
2947 WHERE project_path = ?1
2948 ORDER BY deleted_at ASC",
2949 )?;
2950 let rows = stmt.query_map([project_path], |row| {
2951 Ok(SyncDeletion {
2952 id: row.get(0)?,
2953 entity_type: row.get(1)?,
2954 entity_id: row.get(2)?,
2955 project_path: row.get(3)?,
2956 deleted_at: row.get(4)?,
2957 deleted_by: row.get(5)?,
2958 })
2959 })?;
2960 rows.collect::<std::result::Result<Vec<_>, _>>()
2961 .map_err(Error::from)
2962 }
2963
2964 pub fn mark_deletions_exported(&mut self, ids: &[i64]) -> Result<()> {
2970 if ids.is_empty() {
2971 return Ok(());
2972 }
2973 let placeholders = vec!["?"; ids.len()].join(",");
2974 let sql = format!("UPDATE sync_deletions SET exported = 1 WHERE id IN ({placeholders})");
2975 let params: Vec<&dyn rusqlite::ToSql> = ids.iter().map(|s| s as &dyn rusqlite::ToSql).collect();
2976 self.conn.execute(&sql, params.as_slice())?;
2977 Ok(())
2978 }
2979
2980 pub fn count_pending_deletions(&self, project_path: &str) -> Result<usize> {
2986 let count: i64 = self.conn.query_row(
2987 "SELECT COUNT(*) FROM sync_deletions WHERE project_path = ?1 AND exported = 0",
2988 [project_path],
2989 |row| row.get(0),
2990 )?;
2991 Ok(count as usize)
2992 }
2993
2994 pub fn apply_deletion(&mut self, entity_type: &str, entity_id: &str) -> Result<bool> {
3000 let sql = match entity_type {
3001 "session" => "DELETE FROM sessions WHERE id = ?1",
3002 "issue" => "DELETE FROM issues WHERE id = ?1",
3003 "context_item" => "DELETE FROM context_items WHERE id = ?1",
3004 "memory" => "DELETE FROM project_memory WHERE id = ?1",
3005 "checkpoint" => "DELETE FROM checkpoints WHERE id = ?1",
3006 _ => return Ok(false),
3007 };
3008 let rows = self.conn.execute(sql, [entity_id])?;
3009 Ok(rows > 0)
3010 }
3011
3012 pub fn get_all_sessions(&self) -> Result<Vec<Session>> {
3018 let mut stmt = self.conn.prepare(
3019 "SELECT id, name, description, branch, channel, project_path, status, ended_at, created_at, updated_at
3020 FROM sessions ORDER BY created_at ASC",
3021 )?;
3022 let rows = stmt.query_map([], |row| {
3023 Ok(Session {
3024 id: row.get(0)?,
3025 name: row.get(1)?,
3026 description: row.get(2)?,
3027 branch: row.get(3)?,
3028 channel: row.get(4)?,
3029 project_path: row.get(5)?,
3030 status: row.get(6)?,
3031 ended_at: row.get(7)?,
3032 created_at: row.get(8)?,
3033 updated_at: row.get(9)?,
3034 })
3035 })?;
3036 rows.collect::<std::result::Result<Vec<_>, _>>()
3037 .map_err(Error::from)
3038 }
3039
3040 pub fn get_all_issues(&self) -> Result<Vec<Issue>> {
3046 let mut stmt = self.conn.prepare(
3047 "SELECT id, short_id, project_path, title, description, details, status, priority, issue_type, plan_id, created_by_agent, assigned_to_agent, created_at, updated_at, closed_at
3048 FROM issues ORDER BY created_at ASC",
3049 )?;
3050 let rows = stmt.query_map([], map_issue_row)?;
3051 rows.collect::<std::result::Result<Vec<_>, _>>()
3052 .map_err(Error::from)
3053 }
3054
3055 pub fn get_all_context_items(
3061 &self,
3062 category: Option<&str>,
3063 priority: Option<&str>,
3064 limit: Option<u32>,
3065 ) -> Result<Vec<ContextItem>> {
3066 let mut sql = String::from(
3067 "SELECT id, session_id, key, value, category, priority, channel, tags, size, created_at, updated_at
3068 FROM context_items WHERE 1=1",
3069 );
3070
3071 let mut params: Vec<Box<dyn rusqlite::ToSql>> = vec![];
3072
3073 if let Some(cat) = category {
3074 sql.push_str(" AND category = ?");
3075 params.push(Box::new(cat.to_string()));
3076 }
3077
3078 if let Some(pri) = priority {
3079 sql.push_str(" AND priority = ?");
3080 params.push(Box::new(pri.to_string()));
3081 }
3082
3083 sql.push_str(" ORDER BY created_at DESC");
3084 if let Some(lim) = limit {
3085 sql.push_str(" LIMIT ?");
3086 params.push(Box::new(lim));
3087 }
3088
3089 let mut stmt = self.conn.prepare(&sql)?;
3090 let params_refs: Vec<&dyn rusqlite::ToSql> = params.iter().map(|b| b.as_ref()).collect();
3091
3092 let rows = stmt.query_map(params_refs.as_slice(), |row| {
3093 Ok(ContextItem {
3094 id: row.get(0)?,
3095 session_id: row.get(1)?,
3096 key: row.get(2)?,
3097 value: row.get(3)?,
3098 category: row.get(4)?,
3099 priority: row.get(5)?,
3100 channel: row.get(6)?,
3101 tags: row.get(7)?,
3102 size: row.get(8)?,
3103 created_at: row.get(9)?,
3104 updated_at: row.get(10)?,
3105 })
3106 })?;
3107 rows.collect::<std::result::Result<Vec<_>, _>>()
3108 .map_err(Error::from)
3109 }
3110
3111 pub fn get_all_memory(&self) -> Result<Vec<Memory>> {
3117 let mut stmt = self.conn.prepare(
3118 "SELECT id, project_path, key, value, category, created_at, updated_at
3119 FROM project_memory ORDER BY created_at ASC",
3120 )?;
3121 let rows = stmt.query_map([], |row| {
3122 Ok(Memory {
3123 id: row.get(0)?,
3124 project_path: row.get(1)?,
3125 key: row.get(2)?,
3126 value: row.get(3)?,
3127 category: row.get(4)?,
3128 created_at: row.get(5)?,
3129 updated_at: row.get(6)?,
3130 })
3131 })?;
3132 rows.collect::<std::result::Result<Vec<_>, _>>()
3133 .map_err(Error::from)
3134 }
3135
3136 pub fn get_all_issue_short_ids(&self) -> Result<Vec<String>> {
3141 let mut stmt = self
3142 .conn
3143 .prepare("SELECT short_id FROM issues WHERE short_id IS NOT NULL")?;
3144 let rows = stmt.query_map([], |row| row.get::<_, String>(0))?;
3145 rows.collect::<std::result::Result<Vec<_>, _>>()
3146 .map_err(Error::from)
3147 }
3148
3149 pub fn get_all_session_ids(&self) -> Result<Vec<String>> {
3151 let mut stmt = self.conn.prepare("SELECT id FROM sessions")?;
3152 let rows = stmt.query_map([], |row| row.get::<_, String>(0))?;
3153 rows.collect::<std::result::Result<Vec<_>, _>>()
3154 .map_err(Error::from)
3155 }
3156
3157 pub fn get_all_checkpoint_ids(&self) -> Result<Vec<String>> {
3159 let mut stmt = self.conn.prepare("SELECT id FROM checkpoints")?;
3160 let rows = stmt.query_map([], |row| row.get::<_, String>(0))?;
3161 rows.collect::<std::result::Result<Vec<_>, _>>()
3162 .map_err(Error::from)
3163 }
3164
3165 pub fn get_all_checkpoints(&self) -> Result<Vec<Checkpoint>> {
3171 let mut stmt = self.conn.prepare(
3172 "SELECT c.id, c.session_id, c.name, c.description, c.git_status, c.git_branch, c.created_at,
3173 (SELECT COUNT(*) FROM checkpoint_items ci WHERE ci.checkpoint_id = c.id) as item_count
3174 FROM checkpoints c ORDER BY c.created_at ASC",
3175 )?;
3176 let rows = stmt.query_map([], |row| {
3177 Ok(Checkpoint {
3178 id: row.get(0)?,
3179 session_id: row.get(1)?,
3180 name: row.get(2)?,
3181 description: row.get(3)?,
3182 git_status: row.get(4)?,
3183 git_branch: row.get(5)?,
3184 created_at: row.get(6)?,
3185 item_count: row.get(7)?,
3186 })
3187 })?;
3188 rows.collect::<std::result::Result<Vec<_>, _>>()
3189 .map_err(Error::from)
3190 }
3191
3192 pub fn get_context_item(&self, id: &str) -> Result<Option<ContextItem>> {
3198 let mut stmt = self.conn.prepare(
3199 "SELECT id, session_id, key, value, category, priority, channel, tags, size, created_at, updated_at
3200 FROM context_items WHERE id = ?1",
3201 )?;
3202 let item = stmt
3203 .query_row([id], |row| {
3204 Ok(ContextItem {
3205 id: row.get(0)?,
3206 session_id: row.get(1)?,
3207 key: row.get(2)?,
3208 value: row.get(3)?,
3209 category: row.get(4)?,
3210 priority: row.get(5)?,
3211 channel: row.get(6)?,
3212 tags: row.get(7)?,
3213 size: row.get(8)?,
3214 created_at: row.get(9)?,
3215 updated_at: row.get(10)?,
3216 })
3217 })
3218 .optional()?;
3219 Ok(item)
3220 }
3221
3222 pub fn get_sessions_by_project(&self, project_path: &str) -> Result<Vec<Session>> {
3232 let mut stmt = self.conn.prepare(
3233 "SELECT id, name, description, branch, channel, project_path, status, ended_at, created_at, updated_at
3234 FROM sessions WHERE project_path = ?1 ORDER BY created_at ASC",
3235 )?;
3236 let rows = stmt.query_map([project_path], |row| {
3237 Ok(Session {
3238 id: row.get(0)?,
3239 name: row.get(1)?,
3240 description: row.get(2)?,
3241 branch: row.get(3)?,
3242 channel: row.get(4)?,
3243 project_path: row.get(5)?,
3244 status: row.get(6)?,
3245 ended_at: row.get(7)?,
3246 created_at: row.get(8)?,
3247 updated_at: row.get(9)?,
3248 })
3249 })?;
3250 rows.collect::<std::result::Result<Vec<_>, _>>()
3251 .map_err(Error::from)
3252 }
3253
3254 pub fn get_issues_by_project(&self, project_path: &str) -> Result<Vec<Issue>> {
3260 let mut stmt = self.conn.prepare(
3261 "SELECT id, short_id, project_path, title, description, details, status, priority, issue_type, plan_id, created_by_agent, assigned_to_agent, created_at, updated_at, closed_at
3262 FROM issues WHERE project_path = ?1 ORDER BY created_at ASC",
3263 )?;
3264 let rows = stmt.query_map([project_path], map_issue_row)?;
3265 rows.collect::<std::result::Result<Vec<_>, _>>()
3266 .map_err(Error::from)
3267 }
3268
3269 pub fn get_context_items_by_project(&self, project_path: &str) -> Result<Vec<ContextItem>> {
3278 let mut stmt = self.conn.prepare(
3279 "SELECT ci.id, ci.session_id, ci.key, ci.value, ci.category, ci.priority, ci.channel, ci.tags, ci.size, ci.created_at, ci.updated_at
3280 FROM context_items ci
3281 INNER JOIN sessions s ON ci.session_id = s.id
3282 WHERE s.project_path = ?1
3283 ORDER BY ci.created_at ASC",
3284 )?;
3285 let rows = stmt.query_map([project_path], |row| {
3286 Ok(ContextItem {
3287 id: row.get(0)?,
3288 session_id: row.get(1)?,
3289 key: row.get(2)?,
3290 value: row.get(3)?,
3291 category: row.get(4)?,
3292 priority: row.get(5)?,
3293 channel: row.get(6)?,
3294 tags: row.get(7)?,
3295 size: row.get(8)?,
3296 created_at: row.get(9)?,
3297 updated_at: row.get(10)?,
3298 })
3299 })?;
3300 rows.collect::<std::result::Result<Vec<_>, _>>()
3301 .map_err(Error::from)
3302 }
3303
3304 pub fn get_memory_by_project(&self, project_path: &str) -> Result<Vec<Memory>> {
3310 let mut stmt = self.conn.prepare(
3311 "SELECT id, project_path, key, value, category, created_at, updated_at
3312 FROM project_memory WHERE project_path = ?1 ORDER BY created_at ASC",
3313 )?;
3314 let rows = stmt.query_map([project_path], |row| {
3315 Ok(Memory {
3316 id: row.get(0)?,
3317 project_path: row.get(1)?,
3318 key: row.get(2)?,
3319 value: row.get(3)?,
3320 category: row.get(4)?,
3321 created_at: row.get(5)?,
3322 updated_at: row.get(6)?,
3323 })
3324 })?;
3325 rows.collect::<std::result::Result<Vec<_>, _>>()
3326 .map_err(Error::from)
3327 }
3328
3329 pub fn get_checkpoints_by_project(&self, project_path: &str) -> Result<Vec<Checkpoint>> {
3338 let mut stmt = self.conn.prepare(
3339 "SELECT c.id, c.session_id, c.name, c.description, c.git_status, c.git_branch, c.created_at,
3340 (SELECT COUNT(*) FROM checkpoint_items ci WHERE ci.checkpoint_id = c.id) as item_count
3341 FROM checkpoints c
3342 INNER JOIN sessions s ON c.session_id = s.id
3343 WHERE s.project_path = ?1
3344 ORDER BY c.created_at ASC",
3345 )?;
3346 let rows = stmt.query_map([project_path], |row| {
3347 Ok(Checkpoint {
3348 id: row.get(0)?,
3349 session_id: row.get(1)?,
3350 name: row.get(2)?,
3351 description: row.get(3)?,
3352 git_status: row.get(4)?,
3353 git_branch: row.get(5)?,
3354 created_at: row.get(6)?,
3355 item_count: row.get(7)?,
3356 })
3357 })?;
3358 rows.collect::<std::result::Result<Vec<_>, _>>()
3359 .map_err(Error::from)
3360 }
3361
3362 pub fn get_dirty_sessions_by_project(&self, project_path: &str) -> Result<Vec<String>> {
3368 let mut stmt = self.conn.prepare(
3369 "SELECT ds.session_id
3370 FROM dirty_sessions ds
3371 INNER JOIN sessions s ON ds.session_id = s.id
3372 WHERE s.project_path = ?1",
3373 )?;
3374 let rows = stmt.query_map([project_path], |row| row.get(0))?;
3375 rows.collect::<std::result::Result<Vec<_>, _>>()
3376 .map_err(Error::from)
3377 }
3378
3379 pub fn get_dirty_issues_by_project(&self, project_path: &str) -> Result<Vec<String>> {
3385 let mut stmt = self.conn.prepare(
3386 "SELECT di.issue_id
3387 FROM dirty_issues di
3388 INNER JOIN issues i ON di.issue_id = i.id
3389 WHERE i.project_path = ?1",
3390 )?;
3391 let rows = stmt.query_map([project_path], |row| row.get(0))?;
3392 rows.collect::<std::result::Result<Vec<_>, _>>()
3393 .map_err(Error::from)
3394 }
3395
3396 pub fn get_dirty_context_items_by_project(&self, project_path: &str) -> Result<Vec<String>> {
3402 let mut stmt = self.conn.prepare(
3403 "SELECT dci.item_id
3404 FROM dirty_context_items dci
3405 INNER JOIN context_items ci ON dci.item_id = ci.id
3406 INNER JOIN sessions s ON ci.session_id = s.id
3407 WHERE s.project_path = ?1",
3408 )?;
3409 let rows = stmt.query_map([project_path], |row| row.get(0))?;
3410 rows.collect::<std::result::Result<Vec<_>, _>>()
3411 .map_err(Error::from)
3412 }
3413
3414 pub fn backfill_dirty_for_project(&mut self, project_path: &str) -> Result<BackfillStats> {
3424 let now = chrono::Utc::now().timestamp_millis();
3425
3426 let sessions_count = self.conn.execute(
3428 "INSERT OR IGNORE INTO dirty_sessions (session_id, marked_at)
3429 SELECT id, ?1 FROM sessions WHERE project_path = ?2",
3430 rusqlite::params![now, project_path],
3431 )?;
3432
3433 let issues_count = self.conn.execute(
3435 "INSERT OR IGNORE INTO dirty_issues (issue_id, marked_at)
3436 SELECT id, ?1 FROM issues WHERE project_path = ?2",
3437 rusqlite::params![now, project_path],
3438 )?;
3439
3440 let context_items_count = self.conn.execute(
3442 "INSERT OR IGNORE INTO dirty_context_items (item_id, marked_at)
3443 SELECT ci.id, ?1 FROM context_items ci
3444 INNER JOIN sessions s ON ci.session_id = s.id
3445 WHERE s.project_path = ?2",
3446 rusqlite::params![now, project_path],
3447 )?;
3448
3449 let plans_count = self.conn.execute(
3451 "INSERT OR IGNORE INTO dirty_plans (plan_id, marked_at)
3452 SELECT id, ?1 FROM plans WHERE project_path = ?2",
3453 rusqlite::params![now, project_path],
3454 )?;
3455
3456 let time_entries_count = self.conn.execute(
3458 "INSERT OR IGNORE INTO dirty_time_entries (time_entry_id, marked_at)
3459 SELECT id, ?1 FROM time_entries WHERE project_path = ?2",
3460 rusqlite::params![now, project_path],
3461 )?;
3462
3463 Ok(BackfillStats {
3464 sessions: sessions_count,
3465 issues: issues_count,
3466 context_items: context_items_count,
3467 plans: plans_count,
3468 time_entries: time_entries_count,
3469 })
3470 }
3471
3472 pub fn get_project_counts(&self, project_path: &str) -> Result<ProjectCounts> {
3478 let sessions: i64 = self.conn.query_row(
3479 "SELECT COUNT(*) FROM sessions WHERE project_path = ?1",
3480 [project_path],
3481 |row| row.get(0),
3482 )?;
3483
3484 let issues: i64 = self.conn.query_row(
3485 "SELECT COUNT(*) FROM issues WHERE project_path = ?1",
3486 [project_path],
3487 |row| row.get(0),
3488 )?;
3489
3490 let context_items: i64 = self.conn.query_row(
3491 "SELECT COUNT(*) FROM context_items ci
3492 INNER JOIN sessions s ON ci.session_id = s.id
3493 WHERE s.project_path = ?1",
3494 [project_path],
3495 |row| row.get(0),
3496 )?;
3497
3498 let memories: i64 = self.conn.query_row(
3499 "SELECT COUNT(*) FROM project_memory WHERE project_path = ?1",
3500 [project_path],
3501 |row| row.get(0),
3502 )?;
3503
3504 let checkpoints: i64 = self.conn.query_row(
3505 "SELECT COUNT(*) FROM checkpoints c
3506 INNER JOIN sessions s ON c.session_id = s.id
3507 WHERE s.project_path = ?1",
3508 [project_path],
3509 |row| row.get(0),
3510 )?;
3511
3512 Ok(ProjectCounts {
3513 sessions: sessions as usize,
3514 issues: issues as usize,
3515 context_items: context_items as usize,
3516 memories: memories as usize,
3517 checkpoints: checkpoints as usize,
3518 })
3519 }
3520
3521 pub fn upsert_session(&mut self, session: &Session) -> Result<()> {
3533 self.conn.execute(
3534 "INSERT INTO sessions (id, name, description, branch, channel, project_path, status, ended_at, created_at, updated_at)
3535 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10)
3536 ON CONFLICT(id) DO UPDATE SET
3537 name = excluded.name,
3538 description = excluded.description,
3539 branch = excluded.branch,
3540 channel = excluded.channel,
3541 project_path = excluded.project_path,
3542 status = excluded.status,
3543 ended_at = excluded.ended_at,
3544 updated_at = excluded.updated_at",
3545 rusqlite::params![
3546 session.id,
3547 session.name,
3548 session.description,
3549 session.branch,
3550 session.channel,
3551 session.project_path,
3552 session.status,
3553 session.ended_at,
3554 session.created_at,
3555 session.updated_at,
3556 ],
3557 )?;
3558 Ok(())
3559 }
3560
3561 pub fn upsert_issue(&mut self, issue: &Issue) -> Result<()> {
3567 self.conn.execute(
3568 "INSERT INTO issues (id, short_id, project_path, title, description, details, status, priority, issue_type, plan_id, created_by_agent, assigned_to_agent, created_at, updated_at, closed_at)
3569 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13, ?14, ?15)
3570 ON CONFLICT(id) DO UPDATE SET
3571 short_id = excluded.short_id,
3572 project_path = excluded.project_path,
3573 title = excluded.title,
3574 description = excluded.description,
3575 details = excluded.details,
3576 status = excluded.status,
3577 priority = excluded.priority,
3578 issue_type = excluded.issue_type,
3579 plan_id = excluded.plan_id,
3580 assigned_to_agent = excluded.assigned_to_agent,
3581 updated_at = excluded.updated_at,
3582 closed_at = excluded.closed_at",
3583 rusqlite::params![
3584 issue.id,
3585 issue.short_id,
3586 issue.project_path,
3587 issue.title,
3588 issue.description,
3589 issue.details,
3590 issue.status,
3591 issue.priority,
3592 issue.issue_type,
3593 issue.plan_id,
3594 issue.created_by_agent,
3595 issue.assigned_to_agent,
3596 issue.created_at,
3597 issue.updated_at,
3598 issue.closed_at,
3599 ],
3600 )?;
3601 Ok(())
3602 }
3603
3604 pub fn upsert_context_item(&mut self, item: &ContextItem) -> Result<()> {
3610 self.conn.execute(
3611 "INSERT INTO context_items (id, session_id, key, value, category, priority, channel, tags, size, created_at, updated_at)
3612 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11)
3613 ON CONFLICT(id) DO UPDATE SET
3614 key = excluded.key,
3615 value = excluded.value,
3616 category = excluded.category,
3617 priority = excluded.priority,
3618 channel = excluded.channel,
3619 tags = excluded.tags,
3620 size = excluded.size,
3621 updated_at = excluded.updated_at",
3622 rusqlite::params![
3623 item.id,
3624 item.session_id,
3625 item.key,
3626 item.value,
3627 item.category,
3628 item.priority,
3629 item.channel,
3630 item.tags,
3631 item.size,
3632 item.created_at,
3633 item.updated_at,
3634 ],
3635 )?;
3636 Ok(())
3637 }
3638
3639 pub fn upsert_memory(&mut self, memory: &Memory) -> Result<()> {
3645 self.conn.execute(
3646 "INSERT INTO project_memory (id, project_path, key, value, category, created_at, updated_at)
3647 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)
3648 ON CONFLICT(id) DO UPDATE SET
3649 key = excluded.key,
3650 value = excluded.value,
3651 category = excluded.category,
3652 updated_at = excluded.updated_at",
3653 rusqlite::params![
3654 memory.id,
3655 memory.project_path,
3656 memory.key,
3657 memory.value,
3658 memory.category,
3659 memory.created_at,
3660 memory.updated_at,
3661 ],
3662 )?;
3663 Ok(())
3664 }
3665
3666 pub fn upsert_checkpoint(&mut self, checkpoint: &Checkpoint) -> Result<()> {
3674 self.conn.execute(
3675 "INSERT INTO checkpoints (id, session_id, name, description, git_status, git_branch, created_at)
3676 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)
3677 ON CONFLICT(id) DO UPDATE SET
3678 name = excluded.name,
3679 description = excluded.description,
3680 git_status = excluded.git_status,
3681 git_branch = excluded.git_branch",
3682 rusqlite::params![
3683 checkpoint.id,
3684 checkpoint.session_id,
3685 checkpoint.name,
3686 checkpoint.description,
3687 checkpoint.git_status,
3688 checkpoint.git_branch,
3689 checkpoint.created_at,
3690 ],
3691 )?;
3692 Ok(())
3693 }
3694
3695 pub fn create_project(&mut self, project: &Project, actor: &str) -> Result<()> {
3705 self.mutate("create_project", actor, |tx, ctx| {
3706 tx.execute(
3707 "INSERT INTO projects (id, project_path, name, description, issue_prefix, next_issue_number, plan_prefix, next_plan_number, created_at, updated_at)
3708 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10)",
3709 rusqlite::params![
3710 project.id,
3711 project.project_path,
3712 project.name,
3713 project.description,
3714 project.issue_prefix,
3715 project.next_issue_number,
3716 project.plan_prefix,
3717 project.next_plan_number,
3718 project.created_at,
3719 project.updated_at,
3720 ],
3721 )?;
3722
3723 ctx.record_event("project", &project.id, EventType::ProjectCreated);
3724 Ok(())
3725 })
3726 }
3727
3728 pub fn get_project(&self, id: &str) -> Result<Option<Project>> {
3734 let project = self
3735 .conn
3736 .query_row(
3737 "SELECT id, project_path, name, description, issue_prefix, next_issue_number, plan_prefix, next_plan_number, created_at, updated_at
3738 FROM projects WHERE id = ?1",
3739 [id],
3740 map_project_row,
3741 )
3742 .optional()?;
3743 Ok(project)
3744 }
3745
3746 pub fn get_project_by_path(&self, project_path: &str) -> Result<Option<Project>> {
3752 let project = self
3753 .conn
3754 .query_row(
3755 "SELECT id, project_path, name, description, issue_prefix, next_issue_number, plan_prefix, next_plan_number, created_at, updated_at
3756 FROM projects WHERE project_path = ?1",
3757 [project_path],
3758 map_project_row,
3759 )
3760 .optional()?;
3761 Ok(project)
3762 }
3763
3764 pub fn list_projects(&self, limit: usize) -> Result<Vec<Project>> {
3770 let mut stmt = self.conn.prepare(
3771 "SELECT id, project_path, name, description, issue_prefix, next_issue_number, plan_prefix, next_plan_number, created_at, updated_at
3772 FROM projects
3773 ORDER BY updated_at DESC
3774 LIMIT ?1",
3775 )?;
3776
3777 let projects = stmt
3778 .query_map([limit], map_project_row)?
3779 .collect::<std::result::Result<Vec<_>, _>>()?;
3780
3781 Ok(projects)
3782 }
3783
3784 pub fn update_project(
3790 &mut self,
3791 id: &str,
3792 name: Option<&str>,
3793 description: Option<&str>,
3794 issue_prefix: Option<&str>,
3795 actor: &str,
3796 ) -> Result<()> {
3797 self.mutate("update_project", actor, |tx, ctx| {
3798 let now = chrono::Utc::now().timestamp_millis();
3799
3800 let mut updates = vec!["updated_at = ?1"];
3802 let mut params: Vec<Box<dyn rusqlite::ToSql>> = vec![Box::new(now)];
3803 let mut param_idx = 2;
3804
3805 if let Some(n) = name {
3806 updates.push(format!("name = ?{param_idx}").leak());
3807 params.push(Box::new(n.to_string()));
3808 param_idx += 1;
3809 }
3810
3811 if let Some(d) = description {
3812 updates.push(format!("description = ?{param_idx}").leak());
3813 params.push(Box::new(d.to_string()));
3814 param_idx += 1;
3815 }
3816
3817 if let Some(p) = issue_prefix {
3818 updates.push(format!("issue_prefix = ?{param_idx}").leak());
3819 params.push(Box::new(p.to_string()));
3820 param_idx += 1;
3821 }
3822
3823 params.push(Box::new(id.to_string()));
3825
3826 let sql = format!(
3827 "UPDATE projects SET {} WHERE id = ?{}",
3828 updates.join(", "),
3829 param_idx
3830 );
3831
3832 let param_refs: Vec<&dyn rusqlite::ToSql> = params.iter().map(|p| p.as_ref()).collect();
3833 let affected = tx.execute(&sql, param_refs.as_slice())?;
3834
3835 if affected == 0 {
3836 return Err(Error::ProjectNotFound { id: id.to_string() });
3837 }
3838
3839 ctx.record_event("project", id, EventType::ProjectUpdated);
3840 Ok(())
3841 })
3842 }
3843
3844 pub fn delete_project(&mut self, id: &str, actor: &str) -> Result<()> {
3856 self.mutate("delete_project", actor, |tx, ctx| {
3857 let project_path: Option<String> = tx
3859 .query_row(
3860 "SELECT project_path FROM projects WHERE id = ?1",
3861 [id],
3862 |row| row.get(0),
3863 )
3864 .optional()?;
3865
3866 let project_path = project_path.ok_or_else(|| Error::ProjectNotFound { id: id.to_string() })?;
3867
3868 tx.execute(
3870 "DELETE FROM sessions WHERE project_path = ?1",
3871 [&project_path],
3872 )?;
3873
3874 tx.execute(
3876 "DELETE FROM issues WHERE project_path = ?1",
3877 [&project_path],
3878 )?;
3879
3880 tx.execute(
3882 "DELETE FROM plans WHERE project_path = ?1",
3883 [&project_path],
3884 )?;
3885
3886 tx.execute(
3888 "DELETE FROM project_memory WHERE project_path = ?1",
3889 [&project_path],
3890 )?;
3891
3892 let affected = tx.execute("DELETE FROM projects WHERE id = ?1", [id])?;
3894
3895 if affected == 0 {
3896 return Err(Error::ProjectNotFound { id: id.to_string() });
3897 }
3898
3899 ctx.record_event("project", id, EventType::ProjectDeleted);
3900 Ok(())
3901 })
3902 }
3903
3904 pub fn get_or_create_project(&mut self, project_path: &str, actor: &str) -> Result<Project> {
3913 if let Some(project) = self.get_project_by_path(project_path)? {
3915 return Ok(project);
3916 }
3917
3918 let name = std::path::Path::new(project_path)
3920 .file_name()
3921 .and_then(|n| n.to_str())
3922 .unwrap_or("Unknown Project")
3923 .to_string();
3924
3925 let project = Project::new(project_path.to_string(), name);
3926 self.create_project(&project, actor)?;
3927 Ok(project)
3928 }
3929
3930 pub fn get_next_issue_number(&mut self, project_path: &str) -> Result<i32> {
3936 let project = self
3937 .get_project_by_path(project_path)?
3938 .ok_or_else(|| Error::ProjectNotFound { id: project_path.to_string() })?;
3939
3940 let next_num = project.next_issue_number;
3941
3942 self.conn.execute(
3944 "UPDATE projects SET next_issue_number = next_issue_number + 1, updated_at = ?1 WHERE project_path = ?2",
3945 rusqlite::params![chrono::Utc::now().timestamp_millis(), project_path],
3946 )?;
3947
3948 Ok(next_num)
3949 }
3950
3951 pub fn create_plan(&mut self, plan: &Plan, actor: &str) -> Result<()> {
3961 self.mutate("create_plan", actor, |tx, ctx| {
3962 tx.execute(
3963 "INSERT INTO plans (id, short_id, project_id, project_path, title, content, status, success_criteria, session_id, created_in_session, source_path, source_hash, created_at, updated_at)
3964 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13, ?14)",
3965 rusqlite::params![
3966 plan.id,
3967 plan.short_id,
3968 plan.project_id,
3969 plan.project_path,
3970 plan.title,
3971 plan.content,
3972 plan.status.as_str(),
3973 plan.success_criteria,
3974 plan.session_id,
3975 plan.created_in_session,
3976 plan.source_path,
3977 plan.source_hash,
3978 plan.created_at,
3979 plan.updated_at,
3980 ],
3981 )?;
3982
3983 ctx.record_event("plan", &plan.id, EventType::PlanCreated);
3984 Ok(())
3985 })
3986 }
3987
3988 pub fn get_plan(&self, id: &str) -> Result<Option<Plan>> {
3994 let plan = self
3995 .conn
3996 .query_row(
3997 "SELECT id, short_id, project_id, project_path, title, content, status, success_criteria, session_id, created_in_session, completed_in_session, source_path, source_hash, created_at, updated_at, completed_at
3998 FROM plans WHERE id = ?1",
3999 [id],
4000 map_plan_row,
4001 )
4002 .optional()?;
4003 Ok(plan)
4004 }
4005
4006 pub fn list_plans(&self, project_path: &str, status: Option<&str>, limit: usize) -> Result<Vec<Plan>> {
4012 let sql = if let Some(status) = status {
4013 if status == "all" {
4014 "SELECT id, short_id, project_id, project_path, title, content, status, success_criteria, session_id, created_in_session, completed_in_session, source_path, source_hash, created_at, updated_at, completed_at
4015 FROM plans WHERE project_path = ?1
4016 ORDER BY updated_at DESC
4017 LIMIT ?2".to_string()
4018 } else {
4019 format!(
4020 "SELECT id, short_id, project_id, project_path, title, content, status, success_criteria, session_id, created_in_session, completed_in_session, source_path, source_hash, created_at, updated_at, completed_at
4021 FROM plans WHERE project_path = ?1 AND status = '{}'
4022 ORDER BY updated_at DESC
4023 LIMIT ?2",
4024 status
4025 )
4026 }
4027 } else {
4028 "SELECT id, short_id, project_id, project_path, title, content, status, success_criteria, session_id, created_in_session, completed_in_session, source_path, source_hash, created_at, updated_at, completed_at
4030 FROM plans WHERE project_path = ?1 AND status = 'active'
4031 ORDER BY updated_at DESC
4032 LIMIT ?2".to_string()
4033 };
4034
4035 let mut stmt = self.conn.prepare(&sql)?;
4036 let plans = stmt
4037 .query_map(rusqlite::params![project_path, limit], map_plan_row)?
4038 .collect::<std::result::Result<Vec<_>, _>>()?;
4039
4040 Ok(plans)
4041 }
4042
4043 pub fn update_plan(
4049 &mut self,
4050 id: &str,
4051 title: Option<&str>,
4052 content: Option<&str>,
4053 status: Option<&str>,
4054 success_criteria: Option<&str>,
4055 actor: &str,
4056 ) -> Result<()> {
4057 self.mutate("update_plan", actor, |tx, ctx| {
4058 let now = chrono::Utc::now().timestamp_millis();
4059
4060 let mut updates = vec!["updated_at = ?1"];
4062 let mut params: Vec<Box<dyn rusqlite::ToSql>> = vec![Box::new(now)];
4063 let mut param_idx = 2;
4064
4065 if let Some(t) = title {
4066 updates.push(format!("title = ?{param_idx}").leak());
4067 params.push(Box::new(t.to_string()));
4068 param_idx += 1;
4069 }
4070
4071 if let Some(c) = content {
4072 updates.push(format!("content = ?{param_idx}").leak());
4073 params.push(Box::new(c.to_string()));
4074 param_idx += 1;
4075 }
4076
4077 if let Some(s) = status {
4078 updates.push(format!("status = ?{param_idx}").leak());
4079 params.push(Box::new(s.to_string()));
4080 param_idx += 1;
4081
4082 if s == "completed" {
4084 updates.push(format!("completed_at = ?{param_idx}").leak());
4085 params.push(Box::new(now));
4086 param_idx += 1;
4087 }
4088 }
4089
4090 if let Some(sc) = success_criteria {
4091 updates.push(format!("success_criteria = ?{param_idx}").leak());
4092 params.push(Box::new(sc.to_string()));
4093 param_idx += 1;
4094 }
4095
4096 params.push(Box::new(id.to_string()));
4098
4099 let sql = format!(
4100 "UPDATE plans SET {} WHERE id = ?{}",
4101 updates.join(", "),
4102 param_idx
4103 );
4104
4105 let param_refs: Vec<&dyn rusqlite::ToSql> = params.iter().map(|p| p.as_ref()).collect();
4106 let affected = tx.execute(&sql, param_refs.as_slice())?;
4107
4108 if affected == 0 {
4109 return Err(Error::Other(format!("Plan not found: {id}")));
4110 }
4111
4112 let event_type = if status == Some("completed") {
4113 EventType::PlanCompleted
4114 } else {
4115 EventType::PlanUpdated
4116 };
4117 ctx.record_event("plan", id, event_type);
4118 Ok(())
4119 })
4120 }
4121
4122 pub fn get_plans_by_project(&self, project_path: &str) -> Result<Vec<Plan>> {
4128 let mut stmt = self.conn.prepare(
4129 "SELECT id, short_id, project_id, project_path, title, content, status, success_criteria, session_id, created_in_session, completed_in_session, source_path, source_hash, created_at, updated_at, completed_at
4130 FROM plans WHERE project_path = ?1 ORDER BY created_at ASC",
4131 )?;
4132 let rows = stmt.query_map([project_path], map_plan_row)?;
4133 let plans: Vec<Plan> = rows.collect::<std::result::Result<_, _>>()?;
4134 Ok(plans)
4135 }
4136
4137 pub fn find_plan_by_source_hash(&self, source_hash: &str) -> Result<Option<Plan>> {
4143 let plan = self
4144 .conn
4145 .query_row(
4146 "SELECT id, short_id, project_id, project_path, title, content, status, success_criteria, session_id, created_in_session, completed_in_session, source_path, source_hash, created_at, updated_at, completed_at
4147 FROM plans WHERE source_hash = ?1 LIMIT 1",
4148 [source_hash],
4149 map_plan_row,
4150 )
4151 .optional()?;
4152 Ok(plan)
4153 }
4154
4155 pub fn upsert_plan(&mut self, plan: &Plan) -> Result<()> {
4161 self.conn.execute(
4162 "INSERT INTO plans (id, short_id, project_id, project_path, title, content, status, success_criteria, session_id, created_in_session, completed_in_session, source_path, source_hash, created_at, updated_at, completed_at)
4163 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13, ?14, ?15, ?16)
4164 ON CONFLICT(id) DO UPDATE SET
4165 short_id = excluded.short_id,
4166 title = excluded.title,
4167 content = excluded.content,
4168 status = excluded.status,
4169 success_criteria = excluded.success_criteria,
4170 session_id = excluded.session_id,
4171 source_path = excluded.source_path,
4172 source_hash = excluded.source_hash,
4173 updated_at = excluded.updated_at,
4174 completed_at = excluded.completed_at",
4175 rusqlite::params![
4176 plan.id,
4177 plan.short_id,
4178 plan.project_id,
4179 plan.project_path,
4180 plan.title,
4181 plan.content,
4182 plan.status.as_str(),
4183 plan.success_criteria,
4184 plan.session_id,
4185 plan.created_in_session,
4186 plan.completed_in_session,
4187 plan.source_path,
4188 plan.source_hash,
4189 plan.created_at,
4190 plan.updated_at,
4191 plan.completed_at,
4192 ],
4193 )?;
4194 Ok(())
4195 }
4196
4197 pub fn get_dirty_plans_by_project(&self, project_path: &str) -> Result<Vec<String>> {
4203 let mut stmt = self.conn.prepare(
4204 "SELECT dp.plan_id
4205 FROM dirty_plans dp
4206 INNER JOIN plans p ON dp.plan_id = p.id
4207 WHERE p.project_path = ?1",
4208 )?;
4209 let rows = stmt.query_map([project_path], |row| row.get(0))?;
4210 Ok(rows.collect::<std::result::Result<_, _>>()?)
4211 }
4212
4213 pub fn clear_dirty_plans(&mut self, ids: &[String]) -> Result<()> {
4219 if ids.is_empty() {
4220 return Ok(());
4221 }
4222 let placeholders = vec!["?"; ids.len()].join(",");
4223 let sql = format!("DELETE FROM dirty_plans WHERE plan_id IN ({placeholders})");
4224 let params: Vec<&dyn rusqlite::ToSql> = ids.iter().map(|s| s as &dyn rusqlite::ToSql).collect();
4225 self.conn.execute(&sql, params.as_slice())?;
4226 Ok(())
4227 }
4228
4229 pub fn create_time_entry(
4235 &mut self,
4236 id: &str,
4237 short_id: Option<&str>,
4238 project_path: &str,
4239 hours: f64,
4240 description: &str,
4241 work_date: &str,
4242 issue_id: Option<&str>,
4243 period: Option<&str>,
4244 actor: &str,
4245 ) -> Result<()> {
4246 let now = chrono::Utc::now().timestamp_millis();
4247
4248 self.mutate("create_time_entry", actor, |tx, ctx| {
4249 tx.execute(
4250 "INSERT INTO time_entries (id, short_id, project_path, issue_id, period, hours, description, work_date, status, actor, created_at, updated_at)
4251 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, 'logged', ?9, ?10, ?10)",
4252 rusqlite::params![id, short_id, project_path, issue_id, period, hours, description, work_date, actor, now],
4253 )?;
4254
4255 ctx.record_event("time_entry", id, EventType::TimeEntryCreated);
4256 ctx.mark_time_entry_dirty(id);
4257
4258 Ok(())
4259 })
4260 }
4261
4262 pub fn get_time_entry(&self, id: &str, project_path: Option<&str>) -> Result<Option<TimeEntry>> {
4264 let sql = if project_path.is_some() {
4265 "SELECT id, short_id, project_path, issue_id, period, hours, description, work_date, status, actor, created_at, updated_at
4266 FROM time_entries WHERE (id = ?1 OR short_id = ?1) AND project_path = ?2"
4267 } else {
4268 "SELECT id, short_id, project_path, issue_id, period, hours, description, work_date, status, actor, created_at, updated_at
4269 FROM time_entries WHERE id = ?1 OR short_id = ?1"
4270 };
4271
4272 let mut stmt = self.conn.prepare(sql)?;
4273
4274 let entry = if let Some(path) = project_path {
4275 stmt.query_row(rusqlite::params![id, path], map_time_entry_row)
4276 } else {
4277 stmt.query_row(rusqlite::params![id], map_time_entry_row)
4278 }
4279 .optional()?;
4280
4281 Ok(entry)
4282 }
4283
4284 pub fn list_time_entries(
4286 &self,
4287 project_path: &str,
4288 period: Option<&str>,
4289 status: Option<&str>,
4290 issue_id: Option<&str>,
4291 date_from: Option<&str>,
4292 date_to: Option<&str>,
4293 limit: Option<u32>,
4294 ) -> Result<Vec<TimeEntry>> {
4295 let mut conditions = vec!["project_path = ?1".to_string()];
4296 let mut params: Vec<Box<dyn rusqlite::ToSql>> = vec![Box::new(project_path.to_string())];
4297 let mut idx = 2;
4298
4299 if let Some(p) = period {
4300 conditions.push(format!("period = ?{idx}"));
4301 params.push(Box::new(p.to_string()));
4302 idx += 1;
4303 }
4304 if let Some(s) = status {
4305 conditions.push(format!("status = ?{idx}"));
4306 params.push(Box::new(s.to_string()));
4307 idx += 1;
4308 }
4309 if let Some(iid) = issue_id {
4310 conditions.push(format!("issue_id = ?{idx}"));
4311 params.push(Box::new(iid.to_string()));
4312 idx += 1;
4313 }
4314 if let Some(df) = date_from {
4315 conditions.push(format!("work_date >= ?{idx}"));
4316 params.push(Box::new(df.to_string()));
4317 idx += 1;
4318 }
4319 if let Some(dt) = date_to {
4320 conditions.push(format!("work_date <= ?{idx}"));
4321 params.push(Box::new(dt.to_string()));
4322 idx += 1;
4323 }
4324
4325 let limit_val = limit.unwrap_or(200);
4326 conditions.push(format!("1=1"));
4327 let where_clause = conditions.join(" AND ");
4328 let sql = format!(
4329 "SELECT id, short_id, project_path, issue_id, period, hours, description, work_date, status, actor, created_at, updated_at
4330 FROM time_entries WHERE {where_clause} ORDER BY work_date DESC, created_at DESC LIMIT ?{idx}"
4331 );
4332
4333 params.push(Box::new(limit_val));
4334
4335 let param_refs: Vec<&dyn rusqlite::ToSql> = params.iter().map(|p| p.as_ref()).collect();
4336 let mut stmt = self.conn.prepare(&sql)?;
4337 let rows = stmt.query_map(param_refs.as_slice(), map_time_entry_row)?;
4338 let entries: Vec<TimeEntry> = rows.collect::<std::result::Result<_, _>>()?;
4339 Ok(entries)
4340 }
4341
4342 pub fn update_time_entry(
4344 &mut self,
4345 id: &str,
4346 project_path: &str,
4347 hours: Option<f64>,
4348 description: Option<&str>,
4349 period: Option<&str>,
4350 issue_id: Option<&str>,
4351 work_date: Option<&str>,
4352 actor: &str,
4353 ) -> Result<()> {
4354 let full_id = self.resolve_time_entry_id(id, Some(project_path))?;
4356 let now = chrono::Utc::now().timestamp_millis();
4357
4358 self.mutate("update_time_entry", actor, |tx, ctx| {
4359 let mut sets = vec!["updated_at = ?1".to_string()];
4360 let mut params: Vec<Box<dyn rusqlite::ToSql>> = vec![Box::new(now)];
4361 let mut idx = 2;
4362
4363 if let Some(h) = hours {
4364 sets.push(format!("hours = ?{idx}"));
4365 params.push(Box::new(h));
4366 idx += 1;
4367 }
4368 if let Some(d) = description {
4369 sets.push(format!("description = ?{idx}"));
4370 params.push(Box::new(d.to_string()));
4371 idx += 1;
4372 }
4373 if let Some(p) = period {
4374 sets.push(format!("period = ?{idx}"));
4375 params.push(Box::new(p.to_string()));
4376 idx += 1;
4377 }
4378 if let Some(iid) = issue_id {
4379 sets.push(format!("issue_id = ?{idx}"));
4380 params.push(Box::new(iid.to_string()));
4381 idx += 1;
4382 }
4383 if let Some(wd) = work_date {
4384 sets.push(format!("work_date = ?{idx}"));
4385 params.push(Box::new(wd.to_string()));
4386 idx += 1;
4387 }
4388
4389 let set_clause = sets.join(", ");
4390 let sql = format!("UPDATE time_entries SET {set_clause} WHERE id = ?{idx}");
4391 params.push(Box::new(full_id.clone()));
4392
4393 let param_refs: Vec<&dyn rusqlite::ToSql> = params.iter().map(|p| p.as_ref()).collect();
4394 tx.execute(&sql, param_refs.as_slice())?;
4395
4396 ctx.record_event("time_entry", &full_id, EventType::TimeEntryUpdated);
4397 ctx.mark_time_entry_dirty(&full_id);
4398
4399 Ok(())
4400 })
4401 }
4402
4403 pub fn update_time_entry_status(
4405 &mut self,
4406 id: &str,
4407 project_path: &str,
4408 status: &str,
4409 actor: &str,
4410 ) -> Result<()> {
4411 let full_id = self.resolve_time_entry_id(id, Some(project_path))?;
4412 let now = chrono::Utc::now().timestamp_millis();
4413
4414 self.mutate("update_time_entry_status", actor, |tx, ctx| {
4415 tx.execute(
4416 "UPDATE time_entries SET status = ?1, updated_at = ?2 WHERE id = ?3",
4417 rusqlite::params![status, now, full_id],
4418 )?;
4419
4420 ctx.record_event("time_entry", &full_id, EventType::TimeEntryStatusChanged);
4421 ctx.mark_time_entry_dirty(&full_id);
4422
4423 Ok(())
4424 })
4425 }
4426
4427 pub fn invoice_time_entries(
4430 &mut self,
4431 project_path: &str,
4432 period: &str,
4433 from_status: &str,
4434 to_status: &str,
4435 actor: &str,
4436 ) -> Result<(usize, f64)> {
4437 let now = chrono::Utc::now().timestamp_millis();
4438
4439 let entries = self.list_time_entries(project_path, Some(period), Some(from_status), None, None, None, None)?;
4441 let count = entries.len();
4442 let total_hours: f64 = entries.iter().map(|e| e.hours).sum();
4443
4444 if count == 0 {
4445 return Ok((0, 0.0));
4446 }
4447
4448 self.mutate("invoice_time_entries", actor, |tx, ctx| {
4449 tx.execute(
4450 "UPDATE time_entries SET status = ?1, updated_at = ?2 WHERE project_path = ?3 AND period = ?4 AND status = ?5",
4451 rusqlite::params![to_status, now, project_path, period, from_status],
4452 )?;
4453
4454 for entry in &entries {
4455 ctx.record_event("time_entry", &entry.id, EventType::TimeEntryStatusChanged);
4456 ctx.mark_time_entry_dirty(&entry.id);
4457 }
4458
4459 Ok(())
4460 })?;
4461
4462 Ok((count, total_hours))
4463 }
4464
4465 pub fn delete_time_entry(
4467 &mut self,
4468 id: &str,
4469 project_path: &str,
4470 actor: &str,
4471 ) -> Result<()> {
4472 let full_id = self.resolve_time_entry_id(id, Some(project_path))?;
4473
4474 self.mutate("delete_time_entry", actor, |tx, ctx| {
4475 tx.execute(
4476 "DELETE FROM time_entries WHERE id = ?1",
4477 rusqlite::params![full_id],
4478 )?;
4479
4480 tx.execute(
4482 "DELETE FROM dirty_time_entries WHERE time_entry_id = ?1",
4483 rusqlite::params![full_id],
4484 )?;
4485
4486 ctx.record_event("time_entry", &full_id, EventType::TimeEntryDeleted);
4487
4488 Ok(())
4489 })
4490 }
4491
4492 pub fn get_time_total(
4494 &self,
4495 project_path: &str,
4496 period: Option<&str>,
4497 status: Option<&str>,
4498 ) -> Result<f64> {
4499 let mut conditions = vec!["project_path = ?1".to_string()];
4500 let mut params: Vec<Box<dyn rusqlite::ToSql>> = vec![Box::new(project_path.to_string())];
4501 let mut idx = 2;
4502
4503 if let Some(p) = period {
4504 conditions.push(format!("period = ?{idx}"));
4505 params.push(Box::new(p.to_string()));
4506 idx += 1;
4507 }
4508 if let Some(s) = status {
4509 conditions.push(format!("status = ?{idx}"));
4510 params.push(Box::new(s.to_string()));
4511 let _ = idx;
4512 }
4513
4514 let where_clause = conditions.join(" AND ");
4515 let sql = format!("SELECT COALESCE(SUM(hours), 0) FROM time_entries WHERE {where_clause}");
4516
4517 let param_refs: Vec<&dyn rusqlite::ToSql> = params.iter().map(|p| p.as_ref()).collect();
4518 let total: f64 = self.conn.query_row(&sql, param_refs.as_slice(), |row| row.get(0))?;
4519 Ok(total)
4520 }
4521
4522 pub fn get_issue_time_total(&self, issue_id: &str) -> Result<f64> {
4524 let total: f64 = self.conn.query_row(
4525 "SELECT COALESCE(SUM(hours), 0) FROM time_entries WHERE issue_id = ?1",
4526 rusqlite::params![issue_id],
4527 |row| row.get(0),
4528 )?;
4529 Ok(total)
4530 }
4531
4532 pub fn get_time_entries_by_project(&self, project_path: &str) -> Result<Vec<TimeEntry>> {
4534 let mut stmt = self.conn.prepare(
4535 "SELECT id, short_id, project_path, issue_id, period, hours, description, work_date, status, actor, created_at, updated_at
4536 FROM time_entries WHERE project_path = ?1 ORDER BY created_at ASC",
4537 )?;
4538 let rows = stmt.query_map([project_path], map_time_entry_row)?;
4539 let entries: Vec<TimeEntry> = rows.collect::<std::result::Result<_, _>>()?;
4540 Ok(entries)
4541 }
4542
4543 fn resolve_time_entry_id(&self, id: &str, project_path: Option<&str>) -> Result<String> {
4545 let entry = self.get_time_entry(id, project_path)?
4546 .ok_or_else(|| Error::InvalidArgument(format!("Time entry not found: {id}")))?;
4547 Ok(entry.id)
4548 }
4549
4550 pub fn get_dirty_time_entries_by_project(&self, project_path: &str) -> Result<Vec<String>> {
4552 let mut stmt = self.conn.prepare(
4553 "SELECT dte.time_entry_id
4554 FROM dirty_time_entries dte
4555 INNER JOIN time_entries te ON dte.time_entry_id = te.id
4556 WHERE te.project_path = ?1",
4557 )?;
4558 let rows = stmt.query_map([project_path], |row| row.get(0))?;
4559 Ok(rows.collect::<std::result::Result<_, _>>()?)
4560 }
4561
4562 pub fn clear_dirty_time_entries(&mut self, ids: &[String]) -> Result<()> {
4564 if ids.is_empty() {
4565 return Ok(());
4566 }
4567 let placeholders = vec!["?"; ids.len()].join(",");
4568 let sql = format!("DELETE FROM dirty_time_entries WHERE time_entry_id IN ({placeholders})");
4569 let params: Vec<&dyn rusqlite::ToSql> = ids.iter().map(|s| s as &dyn rusqlite::ToSql).collect();
4570 self.conn.execute(&sql, params.as_slice())?;
4571 Ok(())
4572 }
4573
4574 pub fn upsert_time_entry(&mut self, entry: &TimeEntry) -> Result<()> {
4576 self.conn.execute(
4577 "INSERT INTO time_entries (id, short_id, project_path, issue_id, period, hours, description, work_date, status, actor, created_at, updated_at)
4578 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12)
4579 ON CONFLICT(id) DO UPDATE SET
4580 short_id = excluded.short_id,
4581 issue_id = excluded.issue_id,
4582 period = excluded.period,
4583 hours = excluded.hours,
4584 description = excluded.description,
4585 work_date = excluded.work_date,
4586 status = excluded.status,
4587 actor = excluded.actor,
4588 updated_at = excluded.updated_at",
4589 rusqlite::params![
4590 entry.id,
4591 entry.short_id,
4592 entry.project_path,
4593 entry.issue_id,
4594 entry.period,
4595 entry.hours,
4596 entry.description,
4597 entry.work_date,
4598 entry.status,
4599 entry.actor,
4600 entry.created_at,
4601 entry.updated_at,
4602 ],
4603 )?;
4604 Ok(())
4605 }
4606
4607 pub fn store_embedding_chunk(
4620 &mut self,
4621 id: &str,
4622 item_id: &str,
4623 chunk_index: i32,
4624 chunk_text: &str,
4625 embedding: &[f32],
4626 provider: &str,
4627 model: &str,
4628 ) -> Result<()> {
4629 let now = chrono::Utc::now().timestamp_millis();
4630 let dimensions = embedding.len() as i32;
4631
4632 let blob: Vec<u8> = embedding
4634 .iter()
4635 .flat_map(|f| f.to_le_bytes())
4636 .collect();
4637
4638 self.conn.execute(
4639 "INSERT INTO embedding_chunks (id, item_id, chunk_index, chunk_text, embedding, dimensions, provider, model, created_at)
4640 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9)
4641 ON CONFLICT(item_id, chunk_index) DO UPDATE SET
4642 chunk_text = excluded.chunk_text,
4643 embedding = excluded.embedding,
4644 dimensions = excluded.dimensions,
4645 provider = excluded.provider,
4646 model = excluded.model,
4647 created_at = excluded.created_at",
4648 rusqlite::params![id, item_id, chunk_index, chunk_text, blob, dimensions, provider, model, now],
4649 )?;
4650
4651 self.conn.execute(
4653 "UPDATE context_items SET
4654 embedding_status = 'complete',
4655 embedding_provider = ?1,
4656 embedding_model = ?2,
4657 chunk_count = COALESCE(
4658 (SELECT MAX(chunk_index) + 1 FROM embedding_chunks WHERE item_id = ?3),
4659 1
4660 ),
4661 embedded_at = ?4
4662 WHERE id = ?3",
4663 rusqlite::params![provider, model, item_id, now],
4664 )?;
4665
4666 Ok(())
4667 }
4668
4669 pub fn get_embedding_chunks(&self, item_id: &str) -> Result<Vec<EmbeddingChunk>> {
4675 let mut stmt = self.conn.prepare(
4676 "SELECT id, item_id, chunk_index, chunk_text, embedding, dimensions, provider, model, created_at
4677 FROM embedding_chunks
4678 WHERE item_id = ?1
4679 ORDER BY chunk_index ASC",
4680 )?;
4681
4682 let rows = stmt.query_map([item_id], |row| {
4683 let blob: Vec<u8> = row.get(4)?;
4684 let dimensions: i32 = row.get(5)?;
4685
4686 let embedding: Vec<f32> = blob
4688 .chunks_exact(4)
4689 .map(|bytes| f32::from_le_bytes([bytes[0], bytes[1], bytes[2], bytes[3]]))
4690 .collect();
4691
4692 Ok(EmbeddingChunk {
4693 id: row.get(0)?,
4694 item_id: row.get(1)?,
4695 chunk_index: row.get(2)?,
4696 chunk_text: row.get(3)?,
4697 embedding,
4698 dimensions: dimensions as usize,
4699 provider: row.get(6)?,
4700 model: row.get(7)?,
4701 created_at: row.get(8)?,
4702 })
4703 })?;
4704
4705 rows.collect::<std::result::Result<Vec<_>, _>>()
4706 .map_err(Error::from)
4707 }
4708
4709 pub fn get_items_without_embeddings(
4715 &self,
4716 session_id: Option<&str>,
4717 limit: Option<u32>,
4718 ) -> Result<Vec<ContextItem>> {
4719 let limit = limit.unwrap_or(100);
4720
4721 let sql = if let Some(sid) = session_id {
4722 format!(
4723 "SELECT id, session_id, key, value, category, priority, channel, tags, size, created_at, updated_at
4724 FROM context_items
4725 WHERE session_id = '{}' AND (embedding_status IS NULL OR embedding_status IN ('none', 'pending', 'error'))
4726 ORDER BY created_at DESC
4727 LIMIT {}",
4728 sid, limit
4729 )
4730 } else {
4731 format!(
4732 "SELECT id, session_id, key, value, category, priority, channel, tags, size, created_at, updated_at
4733 FROM context_items
4734 WHERE embedding_status IS NULL OR embedding_status IN ('none', 'pending', 'error')
4735 ORDER BY created_at DESC
4736 LIMIT {}",
4737 limit
4738 )
4739 };
4740
4741 let mut stmt = self.conn.prepare(&sql)?;
4742 let rows = stmt.query_map([], |row| {
4743 Ok(ContextItem {
4744 id: row.get(0)?,
4745 session_id: row.get(1)?,
4746 key: row.get(2)?,
4747 value: row.get(3)?,
4748 category: row.get(4)?,
4749 priority: row.get(5)?,
4750 channel: row.get(6)?,
4751 tags: row.get(7)?,
4752 size: row.get(8)?,
4753 created_at: row.get(9)?,
4754 updated_at: row.get(10)?,
4755 })
4756 })?;
4757
4758 rows.collect::<std::result::Result<Vec<_>, _>>()
4759 .map_err(Error::from)
4760 }
4761
4762 pub fn count_embedding_status(&self, session_id: Option<&str>) -> Result<EmbeddingStats> {
4768 let (with_embeddings, without_embeddings) = if let Some(sid) = session_id {
4769 let with: i64 = self.conn.query_row(
4770 "SELECT COUNT(*) FROM context_items WHERE session_id = ?1 AND embedding_status = 'complete'",
4771 [sid],
4772 |row| row.get(0),
4773 )?;
4774 let without: i64 = self.conn.query_row(
4775 "SELECT COUNT(*) FROM context_items WHERE session_id = ?1 AND (embedding_status IS NULL OR embedding_status IN ('none', 'pending', 'error'))",
4776 [sid],
4777 |row| row.get(0),
4778 )?;
4779 (with, without)
4780 } else {
4781 let with: i64 = self.conn.query_row(
4782 "SELECT COUNT(*) FROM context_items WHERE embedding_status = 'complete'",
4783 [],
4784 |row| row.get(0),
4785 )?;
4786 let without: i64 = self.conn.query_row(
4787 "SELECT COUNT(*) FROM context_items WHERE embedding_status IS NULL OR embedding_status IN ('none', 'pending', 'error')",
4788 [],
4789 |row| row.get(0),
4790 )?;
4791 (with, without)
4792 };
4793
4794 Ok(EmbeddingStats {
4795 with_embeddings: with_embeddings as usize,
4796 without_embeddings: without_embeddings as usize,
4797 })
4798 }
4799
4800 pub fn resync_embedding_status(&self) -> Result<usize> {
4812 let count = self.conn.execute(
4813 "UPDATE context_items SET embedding_status = 'pending'
4814 WHERE embedding_status = 'complete'
4815 AND id NOT IN (SELECT DISTINCT item_id FROM embedding_chunks)",
4816 [],
4817 )?;
4818 Ok(count)
4819 }
4820
4821 pub fn semantic_search(
4831 &self,
4832 query_embedding: &[f32],
4833 session_id: Option<&str>,
4834 limit: usize,
4835 threshold: f32,
4836 ) -> Result<Vec<SemanticSearchResult>> {
4837 let sql = if let Some(sid) = session_id {
4839 format!(
4840 "SELECT ec.id, ec.item_id, ec.chunk_index, ec.chunk_text, ec.embedding, ec.dimensions,
4841 ci.key, ci.value, ci.category, ci.priority
4842 FROM embedding_chunks ec
4843 INNER JOIN context_items ci ON ec.item_id = ci.id
4844 WHERE ci.session_id = '{}'",
4845 sid
4846 )
4847 } else {
4848 "SELECT ec.id, ec.item_id, ec.chunk_index, ec.chunk_text, ec.embedding, ec.dimensions,
4849 ci.key, ci.value, ci.category, ci.priority
4850 FROM embedding_chunks ec
4851 INNER JOIN context_items ci ON ec.item_id = ci.id".to_string()
4852 };
4853
4854 let mut stmt = self.conn.prepare(&sql)?;
4855 let rows = stmt.query_map([], |row| {
4856 let blob: Vec<u8> = row.get(4)?;
4857 let embedding: Vec<f32> = blob
4858 .chunks_exact(4)
4859 .map(|bytes| f32::from_le_bytes([bytes[0], bytes[1], bytes[2], bytes[3]]))
4860 .collect();
4861
4862 Ok((
4863 row.get::<_, String>(1)?, row.get::<_, i32>(2)?, row.get::<_, String>(3)?, embedding,
4867 row.get::<_, String>(6)?, row.get::<_, String>(7)?, row.get::<_, String>(8)?, row.get::<_, String>(9)?, ))
4872 })?;
4873
4874 let mut results: Vec<SemanticSearchResult> = rows
4876 .filter_map(|row| row.ok())
4877 .map(|(item_id, chunk_index, chunk_text, embedding, key, value, category, priority)| {
4878 let similarity = cosine_similarity(query_embedding, &embedding);
4879 SemanticSearchResult {
4880 item_id,
4881 chunk_index,
4882 chunk_text,
4883 similarity,
4884 key,
4885 value,
4886 category,
4887 priority,
4888 }
4889 })
4890 .filter(|r| r.similarity >= threshold)
4891 .collect();
4892
4893 results.sort_by(|a, b| b.similarity.partial_cmp(&a.similarity).unwrap_or(std::cmp::Ordering::Equal));
4895
4896 let mut seen_items = std::collections::HashSet::new();
4898 let deduped: Vec<SemanticSearchResult> = results
4899 .into_iter()
4900 .filter(|r| seen_items.insert(r.item_id.clone()))
4901 .take(limit)
4902 .collect();
4903
4904 Ok(deduped)
4905 }
4906
4907 pub fn delete_embeddings(&mut self, item_id: &str) -> Result<()> {
4913 self.conn.execute(
4914 "DELETE FROM embedding_chunks WHERE item_id = ?1",
4915 [item_id],
4916 )?;
4917
4918 self.conn.execute(
4919 "UPDATE context_items SET
4920 embedding_status = 'none',
4921 embedding_provider = NULL,
4922 embedding_model = NULL,
4923 chunk_count = 0,
4924 embedded_at = NULL
4925 WHERE id = ?1",
4926 [item_id],
4927 )?;
4928
4929 Ok(())
4930 }
4931
4932 pub fn get_embedding_meta(&self, key: &str) -> Result<Option<String>> {
4938 let value = self.conn.query_row(
4939 "SELECT value FROM embeddings_meta WHERE key = ?1",
4940 [key],
4941 |row| row.get(0),
4942 ).optional()?;
4943 Ok(value)
4944 }
4945
4946 pub fn set_embedding_meta(&mut self, key: &str, value: &str) -> Result<()> {
4952 let now = chrono::Utc::now().timestamp_millis();
4953 self.conn.execute(
4954 "INSERT INTO embeddings_meta (key, value, updated_at)
4955 VALUES (?1, ?2, ?3)
4956 ON CONFLICT(key) DO UPDATE SET
4957 value = excluded.value,
4958 updated_at = excluded.updated_at",
4959 rusqlite::params![key, value, now],
4960 )?;
4961 Ok(())
4962 }
4963
4964 pub fn store_fast_embedding_chunk(
4977 &mut self,
4978 id: &str,
4979 item_id: &str,
4980 chunk_index: i32,
4981 chunk_text: &str,
4982 embedding: &[f32],
4983 model: &str,
4984 ) -> Result<()> {
4985 let now = chrono::Utc::now().timestamp_millis();
4986 let dimensions = embedding.len() as i32;
4987
4988 let blob: Vec<u8> = embedding
4990 .iter()
4991 .flat_map(|f| f.to_le_bytes())
4992 .collect();
4993
4994 self.conn.execute(
4995 "INSERT INTO embedding_chunks_fast (id, item_id, chunk_index, chunk_text, embedding, dimensions, provider, model, created_at)
4996 VALUES (?1, ?2, ?3, ?4, ?5, ?6, 'model2vec', ?7, ?8)
4997 ON CONFLICT(item_id, chunk_index) DO UPDATE SET
4998 chunk_text = excluded.chunk_text,
4999 embedding = excluded.embedding,
5000 dimensions = excluded.dimensions,
5001 model = excluded.model,
5002 created_at = excluded.created_at",
5003 rusqlite::params![id, item_id, chunk_index, chunk_text, blob, dimensions, model, now],
5004 )?;
5005
5006 self.conn.execute(
5008 "UPDATE context_items SET
5009 fast_embedding_status = 'complete',
5010 fast_embedded_at = ?1
5011 WHERE id = ?2",
5012 rusqlite::params![now, item_id],
5013 )?;
5014
5015 Ok(())
5016 }
5017
5018 pub fn search_fast_tier(
5027 &self,
5028 query_embedding: &[f32],
5029 session_id: Option<&str>,
5030 limit: usize,
5031 threshold: f32,
5032 ) -> Result<Vec<SemanticSearchResult>> {
5033 let sql = if let Some(sid) = session_id {
5035 format!(
5036 "SELECT ec.id, ec.item_id, ec.chunk_index, ec.chunk_text, ec.embedding, ec.dimensions,
5037 ci.key, ci.value, ci.category, ci.priority
5038 FROM embedding_chunks_fast ec
5039 INNER JOIN context_items ci ON ec.item_id = ci.id
5040 WHERE ci.session_id = '{}'",
5041 sid
5042 )
5043 } else {
5044 "SELECT ec.id, ec.item_id, ec.chunk_index, ec.chunk_text, ec.embedding, ec.dimensions,
5045 ci.key, ci.value, ci.category, ci.priority
5046 FROM embedding_chunks_fast ec
5047 INNER JOIN context_items ci ON ec.item_id = ci.id".to_string()
5048 };
5049
5050 let mut stmt = self.conn.prepare(&sql)?;
5051 let rows = stmt.query_map([], |row| {
5052 let blob: Vec<u8> = row.get(4)?;
5053 let embedding: Vec<f32> = blob
5054 .chunks_exact(4)
5055 .map(|bytes| f32::from_le_bytes([bytes[0], bytes[1], bytes[2], bytes[3]]))
5056 .collect();
5057
5058 Ok((
5059 row.get::<_, String>(1)?, row.get::<_, i32>(2)?, row.get::<_, String>(3)?, embedding,
5063 row.get::<_, String>(6)?, row.get::<_, String>(7)?, row.get::<_, String>(8)?, row.get::<_, String>(9)?, ))
5068 })?;
5069
5070 let mut results: Vec<SemanticSearchResult> = rows
5072 .filter_map(|row| row.ok())
5073 .map(|(item_id, chunk_index, chunk_text, embedding, key, value, category, priority)| {
5074 let similarity = cosine_similarity(query_embedding, &embedding);
5075 SemanticSearchResult {
5076 item_id,
5077 chunk_index,
5078 chunk_text,
5079 similarity,
5080 key,
5081 value,
5082 category,
5083 priority,
5084 }
5085 })
5086 .filter(|r| r.similarity >= threshold)
5087 .collect();
5088
5089 results.sort_by(|a, b| b.similarity.partial_cmp(&a.similarity).unwrap_or(std::cmp::Ordering::Equal));
5091
5092 let mut seen_items = std::collections::HashSet::new();
5094 let deduped: Vec<SemanticSearchResult> = results
5095 .into_iter()
5096 .filter(|r| seen_items.insert(r.item_id.clone()))
5097 .take(limit)
5098 .collect();
5099
5100 Ok(deduped)
5101 }
5102
5103 pub fn get_items_needing_quality_upgrade(
5111 &self,
5112 session_id: Option<&str>,
5113 limit: Option<u32>,
5114 ) -> Result<Vec<ContextItem>> {
5115 let limit = limit.unwrap_or(100);
5116
5117 let sql = if let Some(sid) = session_id {
5118 format!(
5119 "SELECT id, session_id, key, value, category, priority, channel, tags, size, created_at, updated_at
5120 FROM context_items
5121 WHERE session_id = '{}'
5122 AND fast_embedding_status = 'complete'
5123 AND (embedding_status IS NULL OR embedding_status = 'none' OR embedding_status = 'pending')
5124 ORDER BY created_at DESC
5125 LIMIT {}",
5126 sid, limit
5127 )
5128 } else {
5129 format!(
5130 "SELECT id, session_id, key, value, category, priority, channel, tags, size, created_at, updated_at
5131 FROM context_items
5132 WHERE fast_embedding_status = 'complete'
5133 AND (embedding_status IS NULL OR embedding_status = 'none' OR embedding_status = 'pending')
5134 ORDER BY created_at DESC
5135 LIMIT {}",
5136 limit
5137 )
5138 };
5139
5140 let mut stmt = self.conn.prepare(&sql)?;
5141 let rows = stmt.query_map([], |row| {
5142 Ok(ContextItem {
5143 id: row.get(0)?,
5144 session_id: row.get(1)?,
5145 key: row.get(2)?,
5146 value: row.get(3)?,
5147 category: row.get(4)?,
5148 priority: row.get(5)?,
5149 channel: row.get(6)?,
5150 tags: row.get(7)?,
5151 size: row.get(8)?,
5152 created_at: row.get(9)?,
5153 updated_at: row.get(10)?,
5154 })
5155 })?;
5156
5157 rows.collect::<std::result::Result<Vec<_>, _>>()
5158 .map_err(Error::from)
5159 }
5160
5161 pub fn delete_fast_embeddings(&mut self, item_id: &str) -> Result<()> {
5167 self.conn.execute(
5168 "DELETE FROM embedding_chunks_fast WHERE item_id = ?1",
5169 [item_id],
5170 )?;
5171
5172 self.conn.execute(
5173 "UPDATE context_items SET
5174 fast_embedding_status = 'none',
5175 fast_embedded_at = NULL
5176 WHERE id = ?1",
5177 [item_id],
5178 )?;
5179
5180 Ok(())
5181 }
5182
5183 pub fn count_fast_embedding_status(&self, session_id: Option<&str>) -> Result<EmbeddingStats> {
5189 let (with_embeddings, without_embeddings) = if let Some(sid) = session_id {
5190 let with: i64 = self.conn.query_row(
5191 "SELECT COUNT(*) FROM context_items WHERE session_id = ?1 AND fast_embedding_status = 'complete'",
5192 [sid],
5193 |row| row.get(0),
5194 )?;
5195 let without: i64 = self.conn.query_row(
5196 "SELECT COUNT(*) FROM context_items WHERE session_id = ?1 AND (fast_embedding_status IS NULL OR fast_embedding_status = 'none')",
5197 [sid],
5198 |row| row.get(0),
5199 )?;
5200 (with, without)
5201 } else {
5202 let with: i64 = self.conn.query_row(
5203 "SELECT COUNT(*) FROM context_items WHERE fast_embedding_status = 'complete'",
5204 [],
5205 |row| row.get(0),
5206 )?;
5207 let without: i64 = self.conn.query_row(
5208 "SELECT COUNT(*) FROM context_items WHERE fast_embedding_status IS NULL OR fast_embedding_status = 'none'",
5209 [],
5210 |row| row.get(0),
5211 )?;
5212 (with, without)
5213 };
5214
5215 Ok(EmbeddingStats {
5216 with_embeddings: with_embeddings as usize,
5217 without_embeddings: without_embeddings as usize,
5218 })
5219 }
5220}
5221
5222fn map_plan_row(row: &rusqlite::Row) -> rusqlite::Result<Plan> {
5224 let status_str: String = row.get(6)?;
5225 Ok(Plan {
5226 id: row.get(0)?,
5227 short_id: row.get(1)?,
5228 project_id: row.get(2)?,
5229 project_path: row.get(3)?,
5230 title: row.get(4)?,
5231 content: row.get(5)?,
5232 status: PlanStatus::from_str(&status_str),
5233 success_criteria: row.get(7)?,
5234 session_id: row.get(8)?,
5235 created_in_session: row.get(9)?,
5236 completed_in_session: row.get(10)?,
5237 source_path: row.get(11)?,
5238 source_hash: row.get(12)?,
5239 created_at: row.get(13)?,
5240 updated_at: row.get(14)?,
5241 completed_at: row.get(15)?,
5242 })
5243}
5244
5245fn map_project_row(row: &rusqlite::Row) -> rusqlite::Result<Project> {
5247 Ok(Project {
5248 id: row.get(0)?,
5249 project_path: row.get(1)?,
5250 name: row.get(2)?,
5251 description: row.get(3)?,
5252 issue_prefix: row.get(4)?,
5253 next_issue_number: row.get(5)?,
5254 plan_prefix: row.get(6)?,
5255 next_plan_number: row.get(7)?,
5256 created_at: row.get(8)?,
5257 updated_at: row.get(9)?,
5258 })
5259}
5260
5261fn map_issue_row(row: &rusqlite::Row) -> rusqlite::Result<Issue> {
5263 Ok(Issue {
5264 id: row.get(0)?,
5265 short_id: row.get(1)?,
5266 project_path: row.get(2)?,
5267 title: row.get(3)?,
5268 description: row.get(4)?,
5269 details: row.get(5)?,
5270 status: row.get(6)?,
5271 priority: row.get(7)?,
5272 issue_type: row.get(8)?,
5273 plan_id: row.get(9)?,
5274 created_by_agent: row.get(10)?,
5275 assigned_to_agent: row.get(11)?,
5276 created_at: row.get(12)?,
5277 updated_at: row.get(13)?,
5278 closed_at: row.get(14)?,
5279 })
5280}
5281
5282fn map_time_entry_row(row: &rusqlite::Row) -> rusqlite::Result<TimeEntry> {
5283 Ok(TimeEntry {
5284 id: row.get(0)?,
5285 short_id: row.get(1)?,
5286 project_path: row.get(2)?,
5287 issue_id: row.get(3)?,
5288 period: row.get(4)?,
5289 hours: row.get(5)?,
5290 description: row.get(6)?,
5291 work_date: row.get(7)?,
5292 status: row.get(8)?,
5293 actor: row.get(9)?,
5294 created_at: row.get(10)?,
5295 updated_at: row.get(11)?,
5296 })
5297}
5298
5299#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
5305pub struct Session {
5306 pub id: String,
5307 pub name: String,
5308 pub description: Option<String>,
5309 pub branch: Option<String>,
5310 pub channel: Option<String>,
5311 pub project_path: Option<String>,
5312 pub status: String,
5313 pub ended_at: Option<i64>,
5314 pub created_at: i64,
5315 pub updated_at: i64,
5316}
5317
5318#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
5320pub struct ContextItem {
5321 pub id: String,
5322 pub session_id: String,
5323 pub key: String,
5324 pub value: String,
5325 pub category: String,
5326 pub priority: String,
5327 pub channel: Option<String>,
5328 pub tags: Option<String>,
5329 pub size: i64,
5330 pub created_at: i64,
5331 pub updated_at: i64,
5332}
5333
5334#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
5337pub struct Issue {
5338 pub id: String,
5339 pub short_id: Option<String>,
5340 pub project_path: String,
5341 pub title: String,
5342 pub description: Option<String>,
5343 pub details: Option<String>,
5344 pub status: String,
5345 pub priority: i32,
5346 pub issue_type: String,
5347 pub plan_id: Option<String>,
5348 pub created_by_agent: Option<String>,
5349 pub assigned_to_agent: Option<String>,
5350 pub created_at: i64,
5351 pub updated_at: i64,
5352 pub closed_at: Option<i64>,
5353}
5354
5355#[derive(Debug, Clone, Default, serde::Serialize, serde::Deserialize)]
5357pub struct EpicProgress {
5358 pub total: usize,
5359 pub closed: usize,
5360 pub in_progress: usize,
5361 pub open: usize,
5362 pub blocked: usize,
5363 pub deferred: usize,
5364}
5365
5366#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
5368pub struct TimeEntry {
5369 pub id: String,
5370 pub short_id: Option<String>,
5371 pub project_path: String,
5372 pub issue_id: Option<String>,
5373 pub period: Option<String>,
5374 pub hours: f64,
5375 pub description: String,
5376 pub work_date: String,
5377 pub status: String,
5378 pub actor: Option<String>,
5379 pub created_at: i64,
5380 pub updated_at: i64,
5381}
5382
5383#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
5385pub struct Checkpoint {
5386 pub id: String,
5387 pub session_id: String,
5388 pub name: String,
5389 pub description: Option<String>,
5390 pub git_status: Option<String>,
5391 pub git_branch: Option<String>,
5392 pub created_at: i64,
5393 pub item_count: i64,
5394}
5395
5396#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
5398pub struct Memory {
5399 pub id: String,
5400 pub project_path: String,
5401 pub key: String,
5402 pub value: String,
5403 pub category: String,
5404 pub created_at: i64,
5405 pub updated_at: i64,
5406}
5407
5408#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
5410pub struct SyncDeletion {
5411 pub id: i64,
5413 pub entity_type: String,
5415 pub entity_id: String,
5417 pub project_path: String,
5419 pub deleted_at: i64,
5421 pub deleted_by: String,
5423}
5424
5425#[derive(Debug, Clone, serde::Serialize)]
5430pub struct EmbeddingChunk {
5431 pub id: String,
5433 pub item_id: String,
5435 pub chunk_index: i32,
5437 pub chunk_text: String,
5439 pub embedding: Vec<f32>,
5441 pub dimensions: usize,
5443 pub provider: String,
5445 pub model: String,
5447 pub created_at: i64,
5449}
5450
5451#[derive(Debug, Clone, serde::Serialize)]
5453pub struct EmbeddingStats {
5454 pub with_embeddings: usize,
5456 pub without_embeddings: usize,
5458}
5459
5460#[derive(Debug, Clone, serde::Serialize)]
5462pub struct SemanticSearchResult {
5463 pub item_id: String,
5465 pub chunk_index: i32,
5467 pub chunk_text: String,
5469 pub similarity: f32,
5471 pub key: String,
5473 pub value: String,
5475 pub category: String,
5477 pub priority: String,
5479}
5480
5481fn cosine_similarity(a: &[f32], b: &[f32]) -> f32 {
5491 if a.len() != b.len() || a.is_empty() {
5492 return 0.0;
5493 }
5494
5495 let mut dot_product = 0.0;
5496 let mut norm_a = 0.0;
5497 let mut norm_b = 0.0;
5498
5499 for (x, y) in a.iter().zip(b.iter()) {
5500 dot_product += x * y;
5501 norm_a += x * x;
5502 norm_b += y * y;
5503 }
5504
5505 let magnitude = (norm_a * norm_b).sqrt();
5506 if magnitude == 0.0 {
5507 0.0
5508 } else {
5509 dot_product / magnitude
5510 }
5511}
5512
5513fn generate_short_id() -> String {
5515 use std::time::{SystemTime, UNIX_EPOCH};
5516 let now = SystemTime::now()
5517 .duration_since(UNIX_EPOCH)
5518 .unwrap()
5519 .as_millis();
5520 format!("{:04x}", (now & 0xFFFF) as u16)
5521}
5522
5523#[cfg(test)]
5524mod tests {
5525 use super::*;
5526
5527 #[test]
5528 fn test_open_memory() {
5529 let storage = SqliteStorage::open_memory();
5530 assert!(storage.is_ok());
5531 }
5532
5533 #[test]
5534 fn test_session_crud() {
5535 let mut storage = SqliteStorage::open_memory().unwrap();
5536
5537 storage
5539 .create_session(
5540 "sess_1",
5541 "Test Session",
5542 Some("A test session"),
5543 Some("/test/project"),
5544 Some("main"),
5545 "test-actor",
5546 )
5547 .unwrap();
5548
5549 let session = storage.get_session("sess_1").unwrap();
5551 assert!(session.is_some());
5552 let session = session.unwrap();
5553 assert_eq!(session.name, "Test Session");
5554 assert_eq!(session.status, "active");
5555
5556 let sessions = storage
5558 .list_sessions(Some("/test/project"), None, None)
5559 .unwrap();
5560 assert_eq!(sessions.len(), 1);
5561
5562 storage
5564 .update_session_status("sess_1", "completed", "test-actor")
5565 .unwrap();
5566 let session = storage.get_session("sess_1").unwrap().unwrap();
5567 assert_eq!(session.status, "completed");
5568 assert!(session.ended_at.is_some());
5569 }
5570
5571 #[test]
5572 fn test_context_item_crud() {
5573 let mut storage = SqliteStorage::open_memory().unwrap();
5574
5575 storage
5577 .create_session("sess_1", "Test", None, None, None, "actor")
5578 .unwrap();
5579
5580 storage
5582 .save_context_item(
5583 "item_1",
5584 "sess_1",
5585 "test-key",
5586 "test value",
5587 Some("note"),
5588 Some("high"),
5589 "actor",
5590 )
5591 .unwrap();
5592
5593 let items = storage.get_context_items("sess_1", None, None, None).unwrap();
5595 assert_eq!(items.len(), 1);
5596 assert_eq!(items[0].key, "test-key");
5597 assert_eq!(items[0].priority, "high");
5598
5599 storage
5601 .save_context_item(
5602 "item_1",
5603 "sess_1",
5604 "test-key",
5605 "updated value",
5606 Some("decision"),
5607 None,
5608 "actor",
5609 )
5610 .unwrap();
5611
5612 let items = storage.get_context_items("sess_1", None, None, None).unwrap();
5613 assert_eq!(items.len(), 1);
5614 assert_eq!(items[0].value, "updated value");
5615
5616 storage
5618 .delete_context_item("sess_1", "test-key", "actor")
5619 .unwrap();
5620 let items = storage.get_context_items("sess_1", None, None, None).unwrap();
5621 assert_eq!(items.len(), 0);
5622 }
5623
5624 #[test]
5625 fn test_issue_crud() {
5626 let mut storage = SqliteStorage::open_memory().unwrap();
5627
5628 storage
5630 .create_issue(
5631 "issue_1",
5632 Some("TST-1"),
5633 "/test/project",
5634 "Test Issue",
5635 Some("Description"),
5636 None, Some("task"), Some(3), None, "actor",
5641 )
5642 .unwrap();
5643
5644 let issue = storage.get_issue("issue_1", None).unwrap();
5646 assert!(issue.is_some());
5647 let issue = issue.unwrap();
5648 assert_eq!(issue.title, "Test Issue");
5649 assert_eq!(issue.priority, 3);
5650
5651 let issue = storage
5653 .get_issue("TST-1", Some("/test/project"))
5654 .unwrap();
5655 assert!(issue.is_some());
5656
5657 let issues = storage
5659 .list_issues("/test/project", None, None, None)
5660 .unwrap();
5661 assert_eq!(issues.len(), 1);
5662
5663 storage.claim_issue("issue_1", "agent-1").unwrap();
5665 let issue = storage.get_issue("issue_1", None).unwrap().unwrap();
5666 assert_eq!(issue.assigned_to_agent, Some("agent-1".to_string()));
5667 assert_eq!(issue.status, "in_progress");
5668
5669 storage.release_issue("issue_1", "agent-1").unwrap();
5671 let issue = storage.get_issue("issue_1", None).unwrap().unwrap();
5672 assert!(issue.assigned_to_agent.is_none());
5673 assert_eq!(issue.status, "open");
5674
5675 storage
5677 .update_issue_status("issue_1", "closed", "actor")
5678 .unwrap();
5679 let issue = storage.get_issue("issue_1", None).unwrap().unwrap();
5680 assert_eq!(issue.status, "closed");
5681 assert!(issue.closed_at.is_some());
5682 }
5683
5684 #[test]
5687 fn test_get_items_without_embeddings_includes_pending() {
5688 let mut storage = SqliteStorage::open_memory().unwrap();
5689 storage
5690 .create_session("sess_1", "Test", None, None, None, "actor")
5691 .unwrap();
5692
5693 for (id, key, status) in [
5695 ("item_1", "none-status", "none"),
5696 ("item_2", "pending-status", "pending"),
5697 ("item_3", "error-status", "error"),
5698 ("item_4", "complete-status", "complete"),
5699 ] {
5700 storage
5701 .save_context_item(id, "sess_1", key, "test value", Some("note"), Some("normal"), "actor")
5702 .unwrap();
5703 storage.conn.execute(
5704 "UPDATE context_items SET embedding_status = ?1 WHERE id = ?2",
5705 rusqlite::params![status, id],
5706 ).unwrap();
5707 }
5708
5709 storage
5711 .save_context_item("item_5", "sess_1", "null-status", "test", Some("note"), Some("normal"), "actor")
5712 .unwrap();
5713 storage.conn.execute(
5714 "UPDATE context_items SET embedding_status = NULL WHERE id = 'item_5'",
5715 [],
5716 ).unwrap();
5717
5718 let items = storage.get_items_without_embeddings(None, None).unwrap();
5719 let keys: Vec<&str> = items.iter().map(|i| i.key.as_str()).collect();
5720
5721 assert!(keys.contains(&"none-status"), "missing 'none' status");
5723 assert!(keys.contains(&"pending-status"), "missing 'pending' status");
5724 assert!(keys.contains(&"error-status"), "missing 'error' status");
5725 assert!(keys.contains(&"null-status"), "missing NULL status");
5726
5727 assert!(!keys.contains(&"complete-status"), "'complete' should be excluded");
5729 assert_eq!(items.len(), 4);
5730 }
5731
5732 #[test]
5733 fn test_get_items_without_embeddings_session_filter() {
5734 let mut storage = SqliteStorage::open_memory().unwrap();
5735 storage.create_session("sess_1", "Session 1", None, None, None, "actor").unwrap();
5736 storage.create_session("sess_2", "Session 2", None, None, None, "actor").unwrap();
5737
5738 storage.save_context_item("item_1", "sess_1", "s1-item", "val", Some("note"), Some("normal"), "actor").unwrap();
5739 storage.save_context_item("item_2", "sess_2", "s2-item", "val", Some("note"), Some("normal"), "actor").unwrap();
5740
5741 storage.conn.execute("UPDATE context_items SET embedding_status = 'pending'", []).unwrap();
5743
5744 let s1_items = storage.get_items_without_embeddings(Some("sess_1"), None).unwrap();
5746 assert_eq!(s1_items.len(), 1);
5747 assert_eq!(s1_items[0].key, "s1-item");
5748
5749 let all_items = storage.get_items_without_embeddings(None, None).unwrap();
5751 assert_eq!(all_items.len(), 2);
5752 }
5753
5754 #[test]
5755 fn test_resync_embedding_status() {
5756 let mut storage = SqliteStorage::open_memory().unwrap();
5757 storage.create_session("sess_1", "Test", None, None, None, "actor").unwrap();
5758
5759 storage.save_context_item("item_1", "sess_1", "phantom", "val", Some("note"), Some("normal"), "actor").unwrap();
5761 storage.save_context_item("item_2", "sess_1", "real", "val", Some("note"), Some("normal"), "actor").unwrap();
5762 storage.save_context_item("item_3", "sess_1", "pending-already", "val", Some("note"), Some("normal"), "actor").unwrap();
5763
5764 storage.conn.execute("UPDATE context_items SET embedding_status = 'complete'", []).unwrap();
5766 storage.conn.execute("UPDATE context_items SET embedding_status = 'pending' WHERE id = 'item_3'", []).unwrap();
5768
5769 storage.conn.execute(
5771 "INSERT INTO embedding_chunks (id, item_id, chunk_index, chunk_text, embedding, dimensions, provider, model, created_at)
5772 VALUES ('ec_1', 'item_2', 0, 'test', X'00000000', 1, 'test', 'test-model', 1000)",
5773 [],
5774 ).unwrap();
5775
5776 let count = storage.resync_embedding_status().unwrap();
5778 assert_eq!(count, 1, "only item_1 should be reset (phantom complete)");
5779
5780 let status_1: String = storage.conn.query_row(
5782 "SELECT embedding_status FROM context_items WHERE id = 'item_1'", [], |r| r.get(0)
5783 ).unwrap();
5784 assert_eq!(status_1, "pending", "phantom complete should be reset");
5785
5786 let status_2: String = storage.conn.query_row(
5787 "SELECT embedding_status FROM context_items WHERE id = 'item_2'", [], |r| r.get(0)
5788 ).unwrap();
5789 assert_eq!(status_2, "complete", "real complete should be untouched");
5790
5791 let status_3: String = storage.conn.query_row(
5792 "SELECT embedding_status FROM context_items WHERE id = 'item_3'", [], |r| r.get(0)
5793 ).unwrap();
5794 assert_eq!(status_3, "pending", "already-pending should be untouched");
5795 }
5796}