1use crate::error::{Error, Result};
7use crate::model::{Plan, PlanStatus, Project};
8use crate::storage::events::{insert_event, Event, EventType};
9use crate::storage::schema::apply_schema;
10use rusqlite::{Connection, OptionalExtension, Transaction};
11use std::collections::HashSet;
12use std::path::Path;
13use std::time::Duration;
14
15#[derive(Debug)]
17pub struct SqliteStorage {
18 conn: Connection,
19}
20
21pub struct MutationContext {
28 pub op_name: String,
30 pub actor: String,
32 pub events: Vec<Event>,
34 pub dirty_sessions: HashSet<String>,
36 pub dirty_issues: HashSet<String>,
37 pub dirty_items: HashSet<String>,
38}
39
40impl MutationContext {
41 #[must_use]
43 pub fn new(op_name: &str, actor: &str) -> Self {
44 Self {
45 op_name: op_name.to_string(),
46 actor: actor.to_string(),
47 events: Vec::new(),
48 dirty_sessions: HashSet::new(),
49 dirty_issues: HashSet::new(),
50 dirty_items: HashSet::new(),
51 }
52 }
53
54 pub fn record_event(
56 &mut self,
57 entity_type: &str,
58 entity_id: &str,
59 event_type: EventType,
60 ) {
61 self.events.push(Event::new(
62 entity_type,
63 entity_id,
64 event_type,
65 &self.actor,
66 ));
67 }
68
69 pub fn record_change(
71 &mut self,
72 entity_type: &str,
73 entity_id: &str,
74 event_type: EventType,
75 old_value: Option<String>,
76 new_value: Option<String>,
77 ) {
78 self.events.push(
79 Event::new(entity_type, entity_id, event_type, &self.actor)
80 .with_values(old_value, new_value),
81 );
82 }
83
84 pub fn mark_session_dirty(&mut self, session_id: &str) {
86 self.dirty_sessions.insert(session_id.to_string());
87 }
88
89 pub fn mark_issue_dirty(&mut self, issue_id: &str) {
91 self.dirty_issues.insert(issue_id.to_string());
92 }
93
94 pub fn mark_item_dirty(&mut self, item_id: &str) {
96 self.dirty_items.insert(item_id.to_string());
97 }
98}
99
100#[derive(Debug, Clone, Default, serde::Serialize, serde::Deserialize)]
105pub struct BackfillStats {
106 pub sessions: usize,
108 pub issues: usize,
110 pub context_items: usize,
112}
113
114impl BackfillStats {
115 #[must_use]
117 pub fn any(&self) -> bool {
118 self.sessions > 0 || self.issues > 0 || self.context_items > 0
119 }
120
121 #[must_use]
123 pub fn total(&self) -> usize {
124 self.sessions + self.issues + self.context_items
125 }
126}
127
128#[derive(Debug, Clone, Default, serde::Serialize, serde::Deserialize)]
133pub struct ProjectCounts {
134 pub sessions: usize,
136 pub issues: usize,
138 pub context_items: usize,
140 pub memories: usize,
142 pub checkpoints: usize,
144}
145
146impl ProjectCounts {
147 #[must_use]
149 pub fn total(&self) -> usize {
150 self.sessions + self.issues + self.context_items + self.memories + self.checkpoints
151 }
152}
153
154impl SqliteStorage {
155 pub fn open(path: &Path) -> Result<Self> {
163 Self::open_with_timeout(path, None)
164 }
165
166 pub fn open_with_timeout(path: &Path, timeout_ms: Option<u64>) -> Result<Self> {
172 let conn = Connection::open(path)?;
173
174 if let Some(timeout) = timeout_ms {
175 conn.busy_timeout(Duration::from_millis(timeout))?;
176 } else {
177 conn.busy_timeout(Duration::from_secs(5))?;
179 }
180
181 apply_schema(&conn)?;
182 Ok(Self { conn })
183 }
184
185 pub fn open_memory() -> Result<Self> {
191 let conn = Connection::open_in_memory()?;
192 apply_schema(&conn)?;
193 Ok(Self { conn })
194 }
195
196 #[must_use]
198 pub fn conn(&self) -> &Connection {
199 &self.conn
200 }
201
202 pub fn mutate<F, R>(&mut self, op: &str, actor: &str, f: F) -> Result<R>
215 where
216 F: FnOnce(&Transaction, &mut MutationContext) -> Result<R>,
217 {
218 let tx = self
219 .conn
220 .transaction_with_behavior(rusqlite::TransactionBehavior::Immediate)?;
221
222 let mut ctx = MutationContext::new(op, actor);
223
224 let result = f(&tx, &mut ctx)?;
226
227 for event in &ctx.events {
229 insert_event(&tx, event)?;
230 }
231
232 tx.commit()?;
237
238 Ok(result)
239 }
240
241 pub fn create_session(
251 &mut self,
252 id: &str,
253 name: &str,
254 description: Option<&str>,
255 project_path: Option<&str>,
256 branch: Option<&str>,
257 actor: &str,
258 ) -> Result<()> {
259 let now = chrono::Utc::now().timestamp_millis();
260
261 self.mutate("create_session", actor, |tx, ctx| {
262 tx.execute(
263 "INSERT INTO sessions (id, name, description, project_path, branch, status, created_at, updated_at)
264 VALUES (?1, ?2, ?3, ?4, ?5, 'active', ?6, ?6)",
265 rusqlite::params![id, name, description, project_path, branch, now],
266 )?;
267
268 if let Some(path) = project_path {
270 tx.execute(
271 "INSERT INTO session_projects (session_id, project_path, added_at) VALUES (?1, ?2, ?3)",
272 rusqlite::params![id, path, now],
273 )?;
274 }
275
276 ctx.record_event("session", id, EventType::SessionCreated);
277 ctx.mark_session_dirty(id);
278
279 Ok(())
280 })
281 }
282
283 pub fn get_session(&self, id: &str) -> Result<Option<Session>> {
289 let mut stmt = self.conn.prepare(
290 "SELECT id, name, description, branch, channel, project_path, status, ended_at, created_at, updated_at
291 FROM sessions WHERE id = ?1",
292 )?;
293
294 let session = stmt
295 .query_row([id], |row| {
296 Ok(Session {
297 id: row.get(0)?,
298 name: row.get(1)?,
299 description: row.get(2)?,
300 branch: row.get(3)?,
301 channel: row.get(4)?,
302 project_path: row.get(5)?,
303 status: row.get(6)?,
304 ended_at: row.get(7)?,
305 created_at: row.get(8)?,
306 updated_at: row.get(9)?,
307 })
308 })
309 .optional()?;
310
311 Ok(session)
312 }
313
314 pub fn list_sessions(
320 &self,
321 project_path: Option<&str>,
322 status: Option<&str>,
323 limit: Option<u32>,
324 ) -> Result<Vec<Session>> {
325 self.list_sessions_with_search(project_path, status, limit, None)
326 }
327
328 pub fn list_sessions_with_search(
337 &self,
338 project_path: Option<&str>,
339 status: Option<&str>,
340 limit: Option<u32>,
341 search: Option<&str>,
342 ) -> Result<Vec<Session>> {
343 let limit = limit.unwrap_or(50);
344
345 let mut conditions: Vec<String> = Vec::new();
347 let mut params: Vec<String> = Vec::new();
348 let mut param_idx = 1;
349
350 let (from_clause, select_distinct) = if let Some(path) = project_path {
352 conditions.push(format!("sp.project_path = ?{param_idx}"));
354 params.push(path.to_string());
355 param_idx += 1;
356 (
357 "sessions s JOIN session_projects sp ON s.id = sp.session_id".to_string(),
358 "DISTINCT ",
359 )
360 } else {
361 ("sessions s".to_string(), "")
363 };
364
365 if let Some(st) = status {
366 conditions.push(format!("s.status = ?{param_idx}"));
367 params.push(st.to_string());
368 param_idx += 1;
369 }
370
371 if let Some(search_term) = search {
372 conditions.push(format!(
374 "(s.name LIKE ?{param_idx} COLLATE NOCASE OR s.description LIKE ?{param_idx} COLLATE NOCASE)"
375 ));
376 params.push(format!("%{search_term}%"));
377 param_idx += 1;
378 }
379
380 let where_clause = if conditions.is_empty() {
381 " WHERE 1=1".to_string()
382 } else {
383 format!(" WHERE {}", conditions.join(" AND "))
384 };
385
386 let sql = format!(
387 "SELECT {select_distinct}s.id, s.name, s.description, s.branch, s.channel, s.project_path, s.status, s.ended_at, s.created_at, s.updated_at
388 FROM {from_clause}{where_clause}
389 ORDER BY s.updated_at DESC LIMIT ?{param_idx}"
390 );
391 params.push(limit.to_string());
392
393 let mut stmt = self.conn.prepare(&sql)?;
394 let params_refs: Vec<&dyn rusqlite::ToSql> = params
395 .iter()
396 .map(|s| s as &dyn rusqlite::ToSql)
397 .collect();
398
399 let rows = stmt.query_map(params_refs.as_slice(), |row| {
400 Ok(Session {
401 id: row.get(0)?,
402 name: row.get(1)?,
403 description: row.get(2)?,
404 branch: row.get(3)?,
405 channel: row.get(4)?,
406 project_path: row.get(5)?,
407 status: row.get(6)?,
408 ended_at: row.get(7)?,
409 created_at: row.get(8)?,
410 updated_at: row.get(9)?,
411 })
412 })?;
413
414 rows.collect::<std::result::Result<Vec<_>, _>>()
415 .map_err(Error::from)
416 }
417
418 pub fn update_session_status(
424 &mut self,
425 id: &str,
426 status: &str,
427 actor: &str,
428 ) -> Result<()> {
429 let now = chrono::Utc::now().timestamp_millis();
430 let ended_at = if status == "completed" || status == "paused" {
431 Some(now)
432 } else {
433 None
434 };
435
436 self.mutate("update_session_status", actor, |tx, ctx| {
437 let rows = tx.execute(
438 "UPDATE sessions SET status = ?1, ended_at = ?2, updated_at = ?3 WHERE id = ?4",
439 rusqlite::params![status, ended_at, now, id],
440 )?;
441
442 if rows == 0 {
443 return Err(Error::SessionNotFound { id: id.to_string() });
444 }
445
446 let event_type = match status {
447 "paused" => EventType::SessionPaused,
448 "completed" => EventType::SessionCompleted,
449 _ => EventType::SessionUpdated,
450 };
451 ctx.record_event("session", id, event_type);
452 ctx.mark_session_dirty(id);
453
454 Ok(())
455 })
456 }
457
458 pub fn rename_session(
464 &mut self,
465 id: &str,
466 new_name: &str,
467 actor: &str,
468 ) -> Result<()> {
469 let now = chrono::Utc::now().timestamp_millis();
470
471 self.mutate("rename_session", actor, |tx, ctx| {
472 let rows = tx.execute(
473 "UPDATE sessions SET name = ?1, updated_at = ?2 WHERE id = ?3",
474 rusqlite::params![new_name, now, id],
475 )?;
476
477 if rows == 0 {
478 return Err(Error::SessionNotFound { id: id.to_string() });
479 }
480
481 ctx.record_event("session", id, EventType::SessionUpdated);
482 ctx.mark_session_dirty(id);
483
484 Ok(())
485 })
486 }
487
488 pub fn delete_session(&mut self, id: &str, actor: &str) -> Result<()> {
499 self.mutate("delete_session", actor, |tx, ctx| {
500 let exists: bool = tx
502 .query_row(
503 "SELECT 1 FROM sessions WHERE id = ?1",
504 [id],
505 |_| Ok(true),
506 )
507 .unwrap_or(false);
508
509 if !exists {
510 return Err(Error::SessionNotFound { id: id.to_string() });
511 }
512
513 tx.execute(
515 "DELETE FROM context_items WHERE session_id = ?1",
516 [id],
517 )?;
518
519 tx.execute(
521 "DELETE FROM checkpoints WHERE session_id = ?1",
522 [id],
523 )?;
524
525 tx.execute(
527 "DELETE FROM session_projects WHERE session_id = ?1",
528 [id],
529 )?;
530
531 tx.execute("DELETE FROM sessions WHERE id = ?1", [id])?;
533
534 ctx.record_event("session", id, EventType::SessionDeleted);
535
536 Ok(())
537 })
538 }
539
540 pub fn add_session_path(
546 &mut self,
547 session_id: &str,
548 project_path: &str,
549 actor: &str,
550 ) -> Result<()> {
551 let now = chrono::Utc::now().timestamp_millis();
552
553 self.mutate("add_session_path", actor, |tx, ctx| {
554 let exists: bool = tx
556 .query_row(
557 "SELECT 1 FROM sessions WHERE id = ?1",
558 [session_id],
559 |_| Ok(true),
560 )
561 .unwrap_or(false);
562
563 if !exists {
564 return Err(Error::SessionNotFound { id: session_id.to_string() });
565 }
566
567 let result = tx.execute(
569 "INSERT INTO session_projects (session_id, project_path, added_at) VALUES (?1, ?2, ?3)",
570 rusqlite::params![session_id, project_path, now],
571 );
572
573 match result {
574 Ok(_) => {
575 ctx.record_event("session", session_id, EventType::SessionPathAdded);
576 ctx.mark_session_dirty(session_id);
577 Ok(())
578 }
579 Err(rusqlite::Error::SqliteFailure(err, _))
580 if err.code == rusqlite::ErrorCode::ConstraintViolation =>
581 {
582 Err(Error::Other(format!(
583 "Path already added to session: {project_path}"
584 )))
585 }
586 Err(e) => Err(e.into()),
587 }
588 })
589 }
590
591 pub fn remove_session_path(
599 &mut self,
600 session_id: &str,
601 project_path: &str,
602 actor: &str,
603 ) -> Result<()> {
604 self.mutate("remove_session_path", actor, |tx, ctx| {
605 let session_path: Option<String> = tx
607 .query_row(
608 "SELECT project_path FROM sessions WHERE id = ?1",
609 [session_id],
610 |row| row.get(0),
611 )
612 .optional()?;
613
614 let primary_path = session_path.ok_or_else(|| Error::SessionNotFound {
615 id: session_id.to_string(),
616 })?;
617
618 if primary_path == project_path {
620 return Err(Error::Other(
621 "Cannot remove primary project path. Use delete_session instead.".to_string(),
622 ));
623 }
624
625 let rows = tx.execute(
627 "DELETE FROM session_projects WHERE session_id = ?1 AND project_path = ?2",
628 rusqlite::params![session_id, project_path],
629 )?;
630
631 if rows == 0 {
632 return Err(Error::Other(format!(
633 "Path not found in session: {project_path}"
634 )));
635 }
636
637 ctx.record_event("session", session_id, EventType::SessionPathRemoved);
638 ctx.mark_session_dirty(session_id);
639
640 Ok(())
641 })
642 }
643
644 pub fn get_session_paths(&self, session_id: &str) -> Result<Vec<String>> {
648 let conn = self.conn();
649
650 let primary_path: Option<String> = conn
652 .query_row(
653 "SELECT project_path FROM sessions WHERE id = ?1",
654 [session_id],
655 |row| row.get(0),
656 )
657 .optional()?;
658
659 let Some(primary) = primary_path else {
660 return Err(Error::SessionNotFound { id: session_id.to_string() });
661 };
662
663 let mut stmt = conn.prepare(
665 "SELECT project_path FROM session_projects WHERE session_id = ?1 ORDER BY added_at",
666 )?;
667
668 let additional_paths: Vec<String> = stmt
669 .query_map([session_id], |row| row.get(0))?
670 .filter_map(|r| r.ok())
671 .collect();
672
673 let mut paths = vec![primary];
675 paths.extend(additional_paths);
676
677 Ok(paths)
678 }
679
680 pub fn save_context_item(
690 &mut self,
691 id: &str,
692 session_id: &str,
693 key: &str,
694 value: &str,
695 category: Option<&str>,
696 priority: Option<&str>,
697 actor: &str,
698 ) -> Result<()> {
699 let now = chrono::Utc::now().timestamp_millis();
700 let category = category.unwrap_or("note");
701 let priority = priority.unwrap_or("normal");
702 let size = value.len() as i64;
703
704 self.mutate("save_context_item", actor, |tx, ctx| {
705 let exists: bool = tx
707 .prepare("SELECT 1 FROM context_items WHERE session_id = ?1 AND key = ?2")?
708 .exists(rusqlite::params![session_id, key])?;
709
710 tx.execute(
711 "INSERT INTO context_items (id, session_id, key, value, category, priority, size, created_at, updated_at)
712 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?8)
713 ON CONFLICT(session_id, key) DO UPDATE SET
714 value = excluded.value,
715 category = excluded.category,
716 priority = excluded.priority,
717 size = excluded.size,
718 updated_at = excluded.updated_at",
719 rusqlite::params![id, session_id, key, value, category, priority, size, now],
720 )?;
721
722 let event_type = if exists {
723 EventType::ItemUpdated
724 } else {
725 EventType::ItemCreated
726 };
727 ctx.record_event("context_item", id, event_type);
728 ctx.mark_item_dirty(id);
729
730 Ok(())
731 })
732 }
733
734 pub fn get_context_items(
740 &self,
741 session_id: &str,
742 category: Option<&str>,
743 priority: Option<&str>,
744 limit: Option<u32>,
745 ) -> Result<Vec<ContextItem>> {
746 let limit = limit.unwrap_or(100);
747
748 let mut sql = String::from(
749 "SELECT id, session_id, key, value, category, priority, channel, tags, size, created_at, updated_at
750 FROM context_items WHERE session_id = ?1",
751 );
752
753 let mut params: Vec<Box<dyn rusqlite::ToSql>> = vec![Box::new(session_id.to_string())];
754
755 if let Some(cat) = category {
756 sql.push_str(" AND category = ?");
757 params.push(Box::new(cat.to_string()));
758 }
759
760 if let Some(pri) = priority {
761 sql.push_str(" AND priority = ?");
762 params.push(Box::new(pri.to_string()));
763 }
764
765 sql.push_str(" ORDER BY created_at DESC LIMIT ?");
766 params.push(Box::new(limit));
767
768 let mut stmt = self.conn.prepare(&sql)?;
769 let params_refs: Vec<&dyn rusqlite::ToSql> = params
770 .iter()
771 .map(|b| b.as_ref())
772 .collect();
773
774 let rows = stmt.query_map(params_refs.as_slice(), |row| {
775 Ok(ContextItem {
776 id: row.get(0)?,
777 session_id: row.get(1)?,
778 key: row.get(2)?,
779 value: row.get(3)?,
780 category: row.get(4)?,
781 priority: row.get(5)?,
782 channel: row.get(6)?,
783 tags: row.get(7)?,
784 size: row.get(8)?,
785 created_at: row.get(9)?,
786 updated_at: row.get(10)?,
787 })
788 })?;
789
790 rows.collect::<std::result::Result<Vec<_>, _>>()
791 .map_err(Error::from)
792 }
793
794 pub fn delete_context_item(
800 &mut self,
801 session_id: &str,
802 key: &str,
803 actor: &str,
804 ) -> Result<()> {
805 self.mutate("delete_context_item", actor, |tx, ctx| {
806 let info: Option<(String, Option<String>)> = tx
808 .query_row(
809 "SELECT ci.id, s.project_path
810 FROM context_items ci
811 JOIN sessions s ON ci.session_id = s.id
812 WHERE ci.session_id = ?1 AND ci.key = ?2",
813 rusqlite::params![session_id, key],
814 |row| Ok((row.get(0)?, row.get(1)?)),
815 )
816 .optional()?;
817
818 let rows = tx.execute(
819 "DELETE FROM context_items WHERE session_id = ?1 AND key = ?2",
820 rusqlite::params![session_id, key],
821 )?;
822
823 if rows > 0 {
824 if let Some((item_id, project_path)) = info {
825 ctx.record_event("context_item", &item_id, EventType::ItemDeleted);
826
827 if let Some(ref path) = project_path {
829 let now = chrono::Utc::now().timestamp_millis();
830 tx.execute(
831 "INSERT INTO sync_deletions (entity_type, entity_id, project_path, deleted_at, deleted_by, exported)
832 VALUES ('context_item', ?1, ?2, ?3, ?4, 0)
833 ON CONFLICT(entity_type, entity_id) DO UPDATE SET
834 deleted_at = excluded.deleted_at,
835 deleted_by = excluded.deleted_by,
836 exported = 0",
837 rusqlite::params![item_id, path, now, ctx.actor],
838 )?;
839 }
840 }
841 }
842
843 Ok(())
844 })
845 }
846
847 pub fn update_context_item(
853 &mut self,
854 session_id: &str,
855 key: &str,
856 value: Option<&str>,
857 category: Option<&str>,
858 priority: Option<&str>,
859 channel: Option<&str>,
860 actor: &str,
861 ) -> Result<()> {
862 self.mutate("update_context_item", actor, |tx, ctx| {
863 let now = chrono::Utc::now().timestamp_millis();
864
865 let mut set_parts: Vec<&str> = vec!["updated_at"];
867 let mut params: Vec<Box<dyn rusqlite::ToSql>> = vec![Box::new(now)];
868
869 if let Some(v) = value {
870 set_parts.push("value");
871 set_parts.push("size");
872 params.push(Box::new(v.to_string()));
873 params.push(Box::new(v.len() as i64));
874 }
875 if let Some(c) = category {
876 set_parts.push("category");
877 params.push(Box::new(c.to_string()));
878 }
879 if let Some(p) = priority {
880 set_parts.push("priority");
881 params.push(Box::new(p.to_string()));
882 }
883 if let Some(ch) = channel {
884 set_parts.push("channel");
885 params.push(Box::new(ch.to_string()));
886 }
887
888 let item_id: Option<String> = tx
890 .query_row(
891 "SELECT id FROM context_items WHERE session_id = ?1 AND key = ?2",
892 rusqlite::params![session_id, key],
893 |row| row.get(0),
894 )
895 .optional()?;
896
897 if item_id.is_none() {
898 return Err(Error::Database(rusqlite::Error::QueryReturnedNoRows));
899 }
900
901 let set_clause: String = set_parts
903 .iter()
904 .enumerate()
905 .map(|(i, field)| format!("{} = ?{}", field, i + 1))
906 .collect::<Vec<_>>()
907 .join(", ");
908
909 let param_count = params.len();
910 let query = format!(
911 "UPDATE context_items SET {} WHERE session_id = ?{} AND key = ?{}",
912 set_clause,
913 param_count + 1,
914 param_count + 2
915 );
916
917 params.push(Box::new(session_id.to_string()));
918 params.push(Box::new(key.to_string()));
919
920 let param_refs: Vec<&dyn rusqlite::ToSql> = params.iter().map(|p| p.as_ref()).collect();
921 tx.execute(&query, param_refs.as_slice())?;
922
923 if let Some(id) = item_id {
924 ctx.record_event("context_item", &id, EventType::ItemUpdated);
925 }
926
927 Ok(())
928 })
929 }
930
931 pub fn add_tags_to_item(
937 &mut self,
938 session_id: &str,
939 key: &str,
940 tags_to_add: &[String],
941 actor: &str,
942 ) -> Result<()> {
943 self.mutate("add_tags_to_item", actor, |tx, ctx| {
944 let now = chrono::Utc::now().timestamp_millis();
945
946 let (item_id, current_tags): (String, String) = tx.query_row(
948 "SELECT id, tags FROM context_items WHERE session_id = ?1 AND key = ?2",
949 rusqlite::params![session_id, key],
950 |row| Ok((row.get(0)?, row.get::<_, Option<String>>(1)?.unwrap_or_else(|| "[]".to_string()))),
951 )?;
952
953 let mut tags: Vec<String> = serde_json::from_str(¤t_tags).unwrap_or_default();
955
956 for tag in tags_to_add {
958 if !tags.contains(tag) {
959 tags.push(tag.clone());
960 }
961 }
962
963 let new_tags = serde_json::to_string(&tags).unwrap_or_else(|_| "[]".to_string());
965 tx.execute(
966 "UPDATE context_items SET tags = ?1, updated_at = ?2 WHERE id = ?3",
967 rusqlite::params![new_tags, now, item_id],
968 )?;
969
970 ctx.record_event("context_item", &item_id, EventType::ItemUpdated);
971
972 Ok(())
973 })
974 }
975
976 pub fn remove_tags_from_item(
982 &mut self,
983 session_id: &str,
984 key: &str,
985 tags_to_remove: &[String],
986 actor: &str,
987 ) -> Result<()> {
988 self.mutate("remove_tags_from_item", actor, |tx, ctx| {
989 let now = chrono::Utc::now().timestamp_millis();
990
991 let (item_id, current_tags): (String, String) = tx.query_row(
993 "SELECT id, tags FROM context_items WHERE session_id = ?1 AND key = ?2",
994 rusqlite::params![session_id, key],
995 |row| Ok((row.get(0)?, row.get::<_, Option<String>>(1)?.unwrap_or_else(|| "[]".to_string()))),
996 )?;
997
998 let mut tags: Vec<String> = serde_json::from_str(¤t_tags).unwrap_or_default();
1000
1001 tags.retain(|t| !tags_to_remove.contains(t));
1003
1004 let new_tags = serde_json::to_string(&tags).unwrap_or_else(|_| "[]".to_string());
1006 tx.execute(
1007 "UPDATE context_items SET tags = ?1, updated_at = ?2 WHERE id = ?3",
1008 rusqlite::params![new_tags, now, item_id],
1009 )?;
1010
1011 ctx.record_event("context_item", &item_id, EventType::ItemUpdated);
1012
1013 Ok(())
1014 })
1015 }
1016
1017 #[allow(clippy::too_many_arguments)]
1027 pub fn create_issue(
1028 &mut self,
1029 id: &str,
1030 short_id: Option<&str>,
1031 project_path: &str,
1032 title: &str,
1033 description: Option<&str>,
1034 details: Option<&str>,
1035 issue_type: Option<&str>,
1036 priority: Option<i32>,
1037 plan_id: Option<&str>,
1038 actor: &str,
1039 ) -> Result<()> {
1040 let now = chrono::Utc::now().timestamp_millis();
1041 let issue_type = issue_type.unwrap_or("task");
1042 let priority = priority.unwrap_or(2);
1043
1044 self.mutate("create_issue", actor, |tx, ctx| {
1045 tx.execute(
1046 "INSERT INTO issues (id, short_id, project_path, title, description, details, issue_type, priority, plan_id, status, created_by_agent, created_at, updated_at)
1047 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, 'open', ?10, ?11, ?11)",
1048 rusqlite::params![id, short_id, project_path, title, description, details, issue_type, priority, plan_id, actor, now],
1049 )?;
1050
1051 ctx.record_event("issue", id, EventType::IssueCreated);
1052 ctx.mark_issue_dirty(id);
1053
1054 Ok(())
1055 })
1056 }
1057
1058 pub fn get_issue(&self, id: &str, project_path: Option<&str>) -> Result<Option<Issue>> {
1064 let sql = if project_path.is_some() {
1066 "SELECT id, short_id, project_path, title, description, details, status, priority, issue_type, plan_id, created_by_agent, assigned_to_agent, created_at, updated_at, closed_at
1067 FROM issues WHERE (id = ?1 OR short_id = ?1) AND project_path = ?2"
1068 } else {
1069 "SELECT id, short_id, project_path, title, description, details, status, priority, issue_type, plan_id, created_by_agent, assigned_to_agent, created_at, updated_at, closed_at
1070 FROM issues WHERE id = ?1 OR short_id = ?1"
1071 };
1072
1073 let mut stmt = self.conn.prepare(sql)?;
1074
1075 let issue = if let Some(path) = project_path {
1076 stmt.query_row(rusqlite::params![id, path], map_issue_row)
1077 } else {
1078 stmt.query_row([id], map_issue_row)
1079 }
1080 .optional()?;
1081
1082 Ok(issue)
1083 }
1084
1085 pub fn list_issues(
1091 &self,
1092 project_path: &str,
1093 status: Option<&str>,
1094 issue_type: Option<&str>,
1095 limit: Option<u32>,
1096 ) -> Result<Vec<Issue>> {
1097 let limit = limit.unwrap_or(50);
1098
1099 let mut sql = String::from(
1100 "SELECT id, short_id, project_path, title, description, details, status, priority, issue_type, plan_id, created_by_agent, assigned_to_agent, created_at, updated_at, closed_at
1101 FROM issues WHERE project_path = ?1",
1102 );
1103
1104 let mut params: Vec<Box<dyn rusqlite::ToSql>> = vec![Box::new(project_path.to_string())];
1105
1106 if let Some(st) = status {
1107 if st != "all" {
1108 sql.push_str(" AND status = ?");
1109 params.push(Box::new(st.to_string()));
1110 }
1111 } else {
1112 sql.push_str(" AND status != 'closed'");
1114 }
1115
1116 if let Some(t) = issue_type {
1117 sql.push_str(" AND issue_type = ?");
1118 params.push(Box::new(t.to_string()));
1119 }
1120
1121 sql.push_str(" ORDER BY priority DESC, created_at ASC LIMIT ?");
1122 params.push(Box::new(limit));
1123
1124 let mut stmt = self.conn.prepare(&sql)?;
1125 let params_refs: Vec<&dyn rusqlite::ToSql> = params
1126 .iter()
1127 .map(|b| b.as_ref())
1128 .collect();
1129
1130 let rows = stmt.query_map(params_refs.as_slice(), map_issue_row)?;
1131
1132 rows.collect::<std::result::Result<Vec<_>, _>>()
1133 .map_err(Error::from)
1134 }
1135
1136 pub fn list_all_issues(
1142 &self,
1143 status: Option<&str>,
1144 issue_type: Option<&str>,
1145 limit: Option<u32>,
1146 ) -> Result<Vec<Issue>> {
1147 let limit = limit.unwrap_or(50);
1148
1149 let mut sql = String::from(
1150 "SELECT id, short_id, project_path, title, description, details, status, priority, issue_type, plan_id, created_by_agent, assigned_to_agent, created_at, updated_at, closed_at
1151 FROM issues WHERE 1=1",
1152 );
1153
1154 let mut params: Vec<Box<dyn rusqlite::ToSql>> = vec![];
1155
1156 if let Some(st) = status {
1157 if st != "all" {
1158 sql.push_str(" AND status = ?");
1159 params.push(Box::new(st.to_string()));
1160 }
1161 } else {
1162 sql.push_str(" AND status != 'closed'");
1164 }
1165
1166 if let Some(t) = issue_type {
1167 sql.push_str(" AND issue_type = ?");
1168 params.push(Box::new(t.to_string()));
1169 }
1170
1171 sql.push_str(" ORDER BY priority DESC, created_at ASC LIMIT ?");
1172 params.push(Box::new(limit));
1173
1174 let mut stmt = self.conn.prepare(&sql)?;
1175 let params_refs: Vec<&dyn rusqlite::ToSql> = params.iter().map(|b| b.as_ref()).collect();
1176
1177 let rows = stmt.query_map(params_refs.as_slice(), map_issue_row)?;
1178
1179 rows.collect::<std::result::Result<Vec<_>, _>>()
1180 .map_err(Error::from)
1181 }
1182
1183 pub fn update_issue_status(
1191 &mut self,
1192 id: &str,
1193 status: &str,
1194 actor: &str,
1195 ) -> Result<()> {
1196 let now = chrono::Utc::now().timestamp_millis();
1197 let closed_at = if status == "closed" { Some(now) } else { None };
1198
1199 self.mutate("update_issue_status", actor, |tx, ctx| {
1200 let rows = tx.execute(
1201 "UPDATE issues SET status = ?1, closed_at = ?2, closed_by_agent = ?3, updated_at = ?4 WHERE id = ?5 OR short_id = ?5",
1202 rusqlite::params![status, closed_at, if status == "closed" { Some(actor) } else { None }, now, id],
1203 )?;
1204
1205 if rows == 0 {
1206 return Err(Error::IssueNotFound { id: id.to_string() });
1207 }
1208
1209 let event_type = if status == "closed" {
1210 EventType::IssueClosed
1211 } else {
1212 EventType::IssueUpdated
1213 };
1214 ctx.record_event("issue", id, event_type);
1215 ctx.mark_issue_dirty(id);
1216
1217 Ok(())
1218 })
1219 }
1220
1221 #[allow(clippy::too_many_arguments)]
1229 pub fn update_issue(
1230 &mut self,
1231 id: &str,
1232 title: Option<&str>,
1233 description: Option<&str>,
1234 details: Option<&str>,
1235 priority: Option<i32>,
1236 issue_type: Option<&str>,
1237 plan_id: Option<&str>,
1238 parent_id: Option<&str>,
1239 actor: &str,
1240 ) -> Result<()> {
1241 let now = chrono::Utc::now().timestamp_millis();
1242
1243 let mut set_clauses = vec!["updated_at = ?"];
1245 let mut params: Vec<Box<dyn rusqlite::ToSql>> = vec![Box::new(now)];
1246
1247 if let Some(t) = title {
1248 set_clauses.push("title = ?");
1249 params.push(Box::new(t.to_string()));
1250 }
1251 if let Some(d) = description {
1252 set_clauses.push("description = ?");
1253 params.push(Box::new(d.to_string()));
1254 }
1255 if let Some(dt) = details {
1256 set_clauses.push("details = ?");
1257 params.push(Box::new(dt.to_string()));
1258 }
1259 if let Some(p) = priority {
1260 set_clauses.push("priority = ?");
1261 params.push(Box::new(p));
1262 }
1263 if let Some(it) = issue_type {
1264 set_clauses.push("issue_type = ?");
1265 params.push(Box::new(it.to_string()));
1266 }
1267 if let Some(pid) = plan_id {
1268 set_clauses.push("plan_id = ?");
1269 params.push(Box::new(pid.to_string()));
1270 }
1271
1272 if set_clauses.len() == 1 && parent_id.is_none() {
1274 return Ok(());
1275 }
1276
1277 self.mutate("update_issue", actor, |tx, ctx| {
1278 if set_clauses.len() > 1 {
1280 let sql = format!(
1281 "UPDATE issues SET {} WHERE id = ? OR short_id = ?",
1282 set_clauses.join(", ")
1283 );
1284 params.push(Box::new(id.to_string()));
1285 params.push(Box::new(id.to_string()));
1286
1287 let param_refs: Vec<&dyn rusqlite::ToSql> = params.iter().map(|p| p.as_ref()).collect();
1288 let rows = tx.execute(&sql, param_refs.as_slice())?;
1289
1290 if rows == 0 {
1291 return Err(Error::IssueNotFound { id: id.to_string() });
1292 }
1293 }
1294
1295 if let Some(new_parent) = parent_id {
1297 let full_id: String = tx.query_row(
1299 "SELECT id FROM issues WHERE id = ?1 OR short_id = ?1",
1300 [id],
1301 |row| row.get(0),
1302 )?;
1303
1304 tx.execute(
1306 "DELETE FROM issue_dependencies WHERE issue_id = ?1 AND dependency_type = 'parent-child'",
1307 [&full_id],
1308 )?;
1309
1310 if !new_parent.is_empty() {
1312 let parent_full_id: String = tx.query_row(
1313 "SELECT id FROM issues WHERE id = ?1 OR short_id = ?1",
1314 [new_parent],
1315 |row| row.get(0),
1316 )?;
1317
1318 tx.execute(
1319 "INSERT INTO issue_dependencies (issue_id, depends_on_id, dependency_type, created_at)
1320 VALUES (?1, ?2, 'parent-child', ?3)",
1321 rusqlite::params![full_id, parent_full_id, now],
1322 )?;
1323 }
1324 }
1325
1326 ctx.record_event("issue", id, EventType::IssueUpdated);
1327 ctx.mark_issue_dirty(id);
1328
1329 Ok(())
1330 })
1331 }
1332
1333 pub fn claim_issue(&mut self, id: &str, actor: &str) -> Result<()> {
1341 let now = chrono::Utc::now().timestamp_millis();
1342
1343 self.mutate("claim_issue", actor, |tx, ctx| {
1344 let rows = tx.execute(
1345 "UPDATE issues SET assigned_to_agent = ?1, assigned_at = ?2, status = 'in_progress', updated_at = ?2 WHERE id = ?3 OR short_id = ?3",
1346 rusqlite::params![actor, now, id],
1347 )?;
1348
1349 if rows == 0 {
1350 return Err(Error::IssueNotFound { id: id.to_string() });
1351 }
1352
1353 ctx.record_event("issue", id, EventType::IssueClaimed);
1354 ctx.mark_issue_dirty(id);
1355
1356 Ok(())
1357 })
1358 }
1359
1360 pub fn release_issue(&mut self, id: &str, actor: &str) -> Result<()> {
1368 let now = chrono::Utc::now().timestamp_millis();
1369
1370 self.mutate("release_issue", actor, |tx, ctx| {
1371 let rows = tx.execute(
1372 "UPDATE issues SET assigned_to_agent = NULL, assigned_at = NULL, status = 'open', updated_at = ?1 WHERE id = ?2 OR short_id = ?2",
1373 rusqlite::params![now, id],
1374 )?;
1375
1376 if rows == 0 {
1377 return Err(Error::IssueNotFound { id: id.to_string() });
1378 }
1379
1380 ctx.record_event("issue", id, EventType::IssueReleased);
1381 ctx.mark_issue_dirty(id);
1382
1383 Ok(())
1384 })
1385 }
1386
1387 pub fn delete_issue(&mut self, id: &str, actor: &str) -> Result<()> {
1395 self.mutate("delete_issue", actor, |tx, ctx| {
1396 let info: Option<(String, String)> = tx
1398 .query_row(
1399 "SELECT id, project_path FROM issues WHERE id = ?1 OR short_id = ?1",
1400 [id],
1401 |row| Ok((row.get(0)?, row.get(1)?)),
1402 )
1403 .optional()?;
1404
1405 let (full_id, project_path) =
1406 info.ok_or_else(|| Error::IssueNotFound { id: id.to_string() })?;
1407
1408 tx.execute(
1410 "DELETE FROM issue_dependencies WHERE issue_id = ?1 OR depends_on_id = ?1",
1411 [&full_id],
1412 )?;
1413
1414 let rows = tx.execute("DELETE FROM issues WHERE id = ?1", [&full_id])?;
1416
1417 if rows == 0 {
1418 return Err(Error::IssueNotFound { id: id.to_string() });
1419 }
1420
1421 ctx.record_event("issue", &full_id, EventType::IssueDeleted);
1422
1423 let now = chrono::Utc::now().timestamp_millis();
1425 tx.execute(
1426 "INSERT INTO sync_deletions (entity_type, entity_id, project_path, deleted_at, deleted_by, exported)
1427 VALUES ('issue', ?1, ?2, ?3, ?4, 0)
1428 ON CONFLICT(entity_type, entity_id) DO UPDATE SET
1429 deleted_at = excluded.deleted_at,
1430 deleted_by = excluded.deleted_by,
1431 exported = 0",
1432 rusqlite::params![full_id, project_path, now, ctx.actor],
1433 )?;
1434
1435 Ok(())
1436 })
1437 }
1438
1439 pub fn add_issue_labels(&mut self, id: &str, labels: &[String], actor: &str) -> Result<()> {
1445 self.mutate("add_issue_labels", actor, |tx, ctx| {
1446 let full_id: String = tx
1448 .query_row(
1449 "SELECT id FROM issues WHERE id = ?1 OR short_id = ?1",
1450 [id],
1451 |row| row.get(0),
1452 )
1453 .optional()?
1454 .ok_or_else(|| Error::IssueNotFound { id: id.to_string() })?;
1455
1456 for label in labels {
1457 let label_id = format!("label_{}", &uuid::Uuid::new_v4().to_string()[..12]);
1458 tx.execute(
1459 "INSERT OR IGNORE INTO issue_labels (id, issue_id, label) VALUES (?1, ?2, ?3)",
1460 rusqlite::params![label_id, full_id, label],
1461 )?;
1462 }
1463
1464 ctx.record_event("issue", &full_id, EventType::IssueUpdated);
1465 Ok(())
1466 })
1467 }
1468
1469 pub fn remove_issue_labels(&mut self, id: &str, labels: &[String], actor: &str) -> Result<()> {
1475 self.mutate("remove_issue_labels", actor, |tx, ctx| {
1476 let full_id: String = tx
1478 .query_row(
1479 "SELECT id FROM issues WHERE id = ?1 OR short_id = ?1",
1480 [id],
1481 |row| row.get(0),
1482 )
1483 .optional()?
1484 .ok_or_else(|| Error::IssueNotFound { id: id.to_string() })?;
1485
1486 for label in labels {
1487 tx.execute(
1488 "DELETE FROM issue_labels WHERE issue_id = ?1 AND label = ?2",
1489 rusqlite::params![full_id, label],
1490 )?;
1491 }
1492
1493 ctx.record_event("issue", &full_id, EventType::IssueUpdated);
1494 Ok(())
1495 })
1496 }
1497
1498 pub fn get_issue_labels(&self, id: &str) -> Result<Vec<String>> {
1504 let full_id: String = self
1505 .conn
1506 .query_row(
1507 "SELECT id FROM issues WHERE id = ?1 OR short_id = ?1",
1508 [id],
1509 |row| row.get(0),
1510 )
1511 .optional()?
1512 .ok_or_else(|| Error::IssueNotFound { id: id.to_string() })?;
1513
1514 let mut stmt = self
1515 .conn
1516 .prepare("SELECT label FROM issue_labels WHERE issue_id = ?1 ORDER BY label")?;
1517 let labels = stmt
1518 .query_map([&full_id], |row| row.get(0))?
1519 .collect::<std::result::Result<Vec<String>, _>>()?;
1520 Ok(labels)
1521 }
1522
1523 pub fn issue_has_dependencies(&self, id: &str) -> Result<bool> {
1525 let full_id: String = self
1526 .conn
1527 .query_row(
1528 "SELECT id FROM issues WHERE id = ?1 OR short_id = ?1",
1529 [id],
1530 |row| row.get(0),
1531 )
1532 .optional()?
1533 .ok_or_else(|| Error::IssueNotFound { id: id.to_string() })?;
1534
1535 let count: i64 = self.conn.query_row(
1536 "SELECT COUNT(*) FROM issue_dependencies WHERE issue_id = ?1",
1537 [&full_id],
1538 |row| row.get(0),
1539 )?;
1540 Ok(count > 0)
1541 }
1542
1543 pub fn issue_has_subtasks(&self, id: &str) -> Result<bool> {
1545 let full_id: String = self
1546 .conn
1547 .query_row(
1548 "SELECT id FROM issues WHERE id = ?1 OR short_id = ?1",
1549 [id],
1550 |row| row.get(0),
1551 )
1552 .optional()?
1553 .ok_or_else(|| Error::IssueNotFound { id: id.to_string() })?;
1554
1555 let count: i64 = self.conn.query_row(
1556 "SELECT COUNT(*) FROM issue_dependencies WHERE depends_on_id = ?1 AND dependency_type = 'parent-child'",
1557 [&full_id],
1558 |row| row.get(0),
1559 )?;
1560 Ok(count > 0)
1561 }
1562
1563 pub fn get_child_issue_ids(&self, parent_id: &str) -> Result<std::collections::HashSet<String>> {
1567 let full_parent_id: String = self
1569 .conn
1570 .query_row(
1571 "SELECT id FROM issues WHERE id = ?1 OR short_id = ?1",
1572 [parent_id],
1573 |row| row.get(0),
1574 )
1575 .optional()?
1576 .ok_or_else(|| Error::IssueNotFound { id: parent_id.to_string() })?;
1577
1578 let mut stmt = self.conn.prepare(
1579 "SELECT issue_id FROM issue_dependencies
1580 WHERE depends_on_id = ?1 AND dependency_type = 'parent-child'",
1581 )?;
1582
1583 let rows = stmt.query_map([&full_parent_id], |row| row.get::<_, String>(0))?;
1584
1585 let mut ids = std::collections::HashSet::new();
1586 for row in rows {
1587 ids.insert(row?);
1588 }
1589 Ok(ids)
1590 }
1591
1592 pub fn add_issue_dependency(
1598 &mut self,
1599 issue_id: &str,
1600 depends_on_id: &str,
1601 dependency_type: &str,
1602 actor: &str,
1603 ) -> Result<()> {
1604 self.mutate("add_issue_dependency", actor, |tx, ctx| {
1605 let full_issue_id: String = tx
1607 .query_row(
1608 "SELECT id FROM issues WHERE id = ?1 OR short_id = ?1",
1609 [issue_id],
1610 |row| row.get(0),
1611 )
1612 .optional()?
1613 .ok_or_else(|| Error::IssueNotFound {
1614 id: issue_id.to_string(),
1615 })?;
1616
1617 let full_depends_on_id: String = tx
1618 .query_row(
1619 "SELECT id FROM issues WHERE id = ?1 OR short_id = ?1",
1620 [depends_on_id],
1621 |row| row.get(0),
1622 )
1623 .optional()?
1624 .ok_or_else(|| Error::IssueNotFound {
1625 id: depends_on_id.to_string(),
1626 })?;
1627
1628 let dep_id = format!("dep_{}", &uuid::Uuid::new_v4().to_string()[..12]);
1629 let now = chrono::Utc::now().timestamp_millis();
1630
1631 tx.execute(
1632 "INSERT OR IGNORE INTO issue_dependencies (id, issue_id, depends_on_id, dependency_type, created_at)
1633 VALUES (?1, ?2, ?3, ?4, ?5)",
1634 rusqlite::params![dep_id, full_issue_id, full_depends_on_id, dependency_type, now],
1635 )?;
1636
1637 ctx.record_event("issue", &full_issue_id, EventType::IssueUpdated);
1638 Ok(())
1639 })
1640 }
1641
1642 pub fn remove_issue_dependency(
1648 &mut self,
1649 issue_id: &str,
1650 depends_on_id: &str,
1651 actor: &str,
1652 ) -> Result<()> {
1653 self.mutate("remove_issue_dependency", actor, |tx, ctx| {
1654 let full_issue_id: String = tx
1656 .query_row(
1657 "SELECT id FROM issues WHERE id = ?1 OR short_id = ?1",
1658 [issue_id],
1659 |row| row.get(0),
1660 )
1661 .optional()?
1662 .ok_or_else(|| Error::IssueNotFound {
1663 id: issue_id.to_string(),
1664 })?;
1665
1666 let full_depends_on_id: String = tx
1667 .query_row(
1668 "SELECT id FROM issues WHERE id = ?1 OR short_id = ?1",
1669 [depends_on_id],
1670 |row| row.get(0),
1671 )
1672 .optional()?
1673 .ok_or_else(|| Error::IssueNotFound {
1674 id: depends_on_id.to_string(),
1675 })?;
1676
1677 tx.execute(
1678 "DELETE FROM issue_dependencies WHERE issue_id = ?1 AND depends_on_id = ?2",
1679 rusqlite::params![full_issue_id, full_depends_on_id],
1680 )?;
1681
1682 ctx.record_event("issue", &full_issue_id, EventType::IssueUpdated);
1683 Ok(())
1684 })
1685 }
1686
1687 pub fn clone_issue(
1693 &mut self,
1694 id: &str,
1695 new_title: Option<&str>,
1696 actor: &str,
1697 ) -> Result<Issue> {
1698 let source = self
1700 .get_issue(id, None)?
1701 .ok_or_else(|| Error::IssueNotFound { id: id.to_string() })?;
1702
1703 let new_id = format!("issue_{}", &uuid::Uuid::new_v4().to_string()[..12]);
1704 let new_short_id = generate_short_id();
1705 let default_title = format!("Copy of {}", source.title);
1706 let title = new_title.unwrap_or(&default_title);
1707 let now = chrono::Utc::now().timestamp_millis();
1708
1709 self.mutate("clone_issue", actor, |tx, ctx| {
1710 tx.execute(
1711 "INSERT INTO issues (id, short_id, project_path, title, description, details, status, priority, issue_type, plan_id, created_by_agent, created_at, updated_at)
1712 VALUES (?1, ?2, ?3, ?4, ?5, ?6, 'open', ?7, ?8, ?9, ?10, ?11, ?11)",
1713 rusqlite::params![
1714 new_id,
1715 new_short_id,
1716 source.project_path,
1717 title,
1718 source.description,
1719 source.details,
1720 source.priority,
1721 source.issue_type,
1722 source.plan_id,
1723 ctx.actor,
1724 now
1725 ],
1726 )?;
1727
1728 let labels: Vec<String> = tx
1730 .prepare("SELECT label FROM issue_labels WHERE issue_id = ?1")?
1731 .query_map([&source.id], |row| row.get(0))?
1732 .collect::<std::result::Result<Vec<String>, _>>()?;
1733
1734 for label in &labels {
1735 let label_id = format!("label_{}", &uuid::Uuid::new_v4().to_string()[..12]);
1736 tx.execute(
1737 "INSERT INTO issue_labels (id, issue_id, label) VALUES (?1, ?2, ?3)",
1738 rusqlite::params![label_id, new_id, label],
1739 )?;
1740 }
1741
1742 ctx.record_event("issue", &new_id, EventType::IssueCreated);
1743 Ok(())
1744 })?;
1745
1746 self.get_issue(&new_id, None)?
1748 .ok_or_else(|| Error::Other("Failed to retrieve cloned issue".to_string()))
1749 }
1750
1751 pub fn mark_issue_duplicate(
1757 &mut self,
1758 id: &str,
1759 duplicate_of_id: &str,
1760 actor: &str,
1761 ) -> Result<()> {
1762 self.add_issue_dependency(id, duplicate_of_id, "duplicate-of", actor)?;
1764
1765 self.update_issue_status(id, "closed", actor)?;
1767
1768 Ok(())
1769 }
1770
1771 pub fn get_ready_issues(&self, project_path: &str, limit: u32) -> Result<Vec<Issue>> {
1777 let mut stmt = self.conn.prepare(
1778 "SELECT i.id, i.short_id, i.project_path, i.title, i.description, i.details,
1779 i.status, i.priority, i.issue_type, i.plan_id, i.created_by_agent,
1780 i.assigned_to_agent, i.created_at, i.updated_at, i.closed_at
1781 FROM issues i
1782 WHERE i.project_path = ?1
1783 AND i.status = 'open'
1784 AND i.assigned_to_agent IS NULL
1785 AND NOT EXISTS (
1786 SELECT 1 FROM issue_dependencies d
1787 JOIN issues dep ON dep.id = d.depends_on_id
1788 WHERE d.issue_id = i.id
1789 AND d.dependency_type = 'blocks'
1790 AND dep.status != 'closed'
1791 )
1792 ORDER BY i.priority DESC, i.created_at ASC
1793 LIMIT ?2",
1794 )?;
1795
1796 let issues = stmt
1797 .query_map(rusqlite::params![project_path, limit], |row| {
1798 Ok(Issue {
1799 id: row.get(0)?,
1800 short_id: row.get(1)?,
1801 project_path: row.get(2)?,
1802 title: row.get(3)?,
1803 description: row.get(4)?,
1804 details: row.get(5)?,
1805 status: row.get(6)?,
1806 priority: row.get(7)?,
1807 issue_type: row.get(8)?,
1808 plan_id: row.get(9)?,
1809 created_by_agent: row.get(10)?,
1810 assigned_to_agent: row.get(11)?,
1811 created_at: row.get(12)?,
1812 updated_at: row.get(13)?,
1813 closed_at: row.get(14)?,
1814 })
1815 })?
1816 .collect::<std::result::Result<Vec<_>, _>>()?;
1817
1818 Ok(issues)
1819 }
1820
1821 pub fn get_next_issue_block(
1827 &mut self,
1828 project_path: &str,
1829 count: u32,
1830 actor: &str,
1831 ) -> Result<Vec<Issue>> {
1832 let ready = self.get_ready_issues(project_path, count)?;
1833
1834 for issue in &ready {
1835 self.claim_issue(&issue.id, actor)?;
1836 }
1837
1838 let claimed: Vec<Issue> = ready
1840 .iter()
1841 .filter_map(|i| self.get_issue(&i.id, None).ok().flatten())
1842 .collect();
1843
1844 Ok(claimed)
1845 }
1846
1847 #[allow(clippy::too_many_arguments)]
1857 pub fn create_checkpoint(
1858 &mut self,
1859 id: &str,
1860 session_id: &str,
1861 name: &str,
1862 description: Option<&str>,
1863 git_status: Option<&str>,
1864 git_branch: Option<&str>,
1865 actor: &str,
1866 ) -> Result<()> {
1867 let now = chrono::Utc::now().timestamp_millis();
1868
1869 self.mutate("create_checkpoint", actor, |tx, ctx| {
1870 tx.execute(
1871 "INSERT INTO checkpoints (id, session_id, name, description, git_status, git_branch, created_at)
1872 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)",
1873 rusqlite::params![id, session_id, name, description, git_status, git_branch, now],
1874 )?;
1875
1876 ctx.record_event("checkpoint", id, EventType::CheckpointCreated);
1877
1878 Ok(())
1879 })
1880 }
1881
1882 pub fn add_checkpoint_item(
1888 &mut self,
1889 checkpoint_id: &str,
1890 context_item_id: &str,
1891 actor: &str,
1892 ) -> Result<()> {
1893 let id = format!("cpitem_{}", &uuid::Uuid::new_v4().to_string()[..12]);
1894 self.mutate("add_checkpoint_item", actor, |tx, _ctx| {
1895 tx.execute(
1896 "INSERT OR IGNORE INTO checkpoint_items (id, checkpoint_id, context_item_id)
1897 VALUES (?1, ?2, ?3)",
1898 rusqlite::params![id, checkpoint_id, context_item_id],
1899 )?;
1900
1901 Ok(())
1902 })
1903 }
1904
1905 pub fn list_checkpoints(
1911 &self,
1912 session_id: &str,
1913 limit: Option<u32>,
1914 ) -> Result<Vec<Checkpoint>> {
1915 let limit = limit.unwrap_or(20);
1916
1917 let mut stmt = self.conn.prepare(
1918 "SELECT c.id, c.session_id, c.name, c.description, c.git_status, c.git_branch, c.created_at,
1919 (SELECT COUNT(*) FROM checkpoint_items ci WHERE ci.checkpoint_id = c.id) as item_count
1920 FROM checkpoints c
1921 WHERE c.session_id = ?1
1922 ORDER BY c.created_at DESC
1923 LIMIT ?2",
1924 )?;
1925
1926 let rows = stmt.query_map(rusqlite::params![session_id, limit], |row| {
1927 Ok(Checkpoint {
1928 id: row.get(0)?,
1929 session_id: row.get(1)?,
1930 name: row.get(2)?,
1931 description: row.get(3)?,
1932 git_status: row.get(4)?,
1933 git_branch: row.get(5)?,
1934 created_at: row.get(6)?,
1935 item_count: row.get(7)?,
1936 })
1937 })?;
1938
1939 rows.collect::<std::result::Result<Vec<_>, _>>()
1940 .map_err(Error::from)
1941 }
1942
1943 pub fn get_checkpoint(&self, id: &str) -> Result<Option<Checkpoint>> {
1949 let mut stmt = self.conn.prepare(
1950 "SELECT c.id, c.session_id, c.name, c.description, c.git_status, c.git_branch, c.created_at,
1951 (SELECT COUNT(*) FROM checkpoint_items ci WHERE ci.checkpoint_id = c.id) as item_count
1952 FROM checkpoints c
1953 WHERE c.id = ?1",
1954 )?;
1955
1956 let checkpoint = stmt
1957 .query_row([id], |row| {
1958 Ok(Checkpoint {
1959 id: row.get(0)?,
1960 session_id: row.get(1)?,
1961 name: row.get(2)?,
1962 description: row.get(3)?,
1963 git_status: row.get(4)?,
1964 git_branch: row.get(5)?,
1965 created_at: row.get(6)?,
1966 item_count: row.get(7)?,
1967 })
1968 })
1969 .optional()?;
1970
1971 Ok(checkpoint)
1972 }
1973
1974 pub fn delete_checkpoint(&mut self, id: &str, actor: &str) -> Result<()> {
1980 self.mutate("delete_checkpoint", actor, |tx, ctx| {
1981 let project_path: Option<Option<String>> = tx
1983 .query_row(
1984 "SELECT s.project_path FROM checkpoints c
1985 JOIN sessions s ON c.session_id = s.id
1986 WHERE c.id = ?1",
1987 [id],
1988 |row| row.get(0),
1989 )
1990 .optional()?;
1991
1992 tx.execute("DELETE FROM checkpoint_items WHERE checkpoint_id = ?1", [id])?;
1994
1995 let rows = tx.execute("DELETE FROM checkpoints WHERE id = ?1", [id])?;
1997
1998 if rows == 0 {
1999 return Err(Error::CheckpointNotFound { id: id.to_string() });
2000 }
2001
2002 ctx.record_event("checkpoint", id, EventType::CheckpointDeleted);
2003
2004 if let Some(Some(path)) = project_path {
2006 let now = chrono::Utc::now().timestamp_millis();
2007 tx.execute(
2008 "INSERT INTO sync_deletions (entity_type, entity_id, project_path, deleted_at, deleted_by, exported)
2009 VALUES ('checkpoint', ?1, ?2, ?3, ?4, 0)
2010 ON CONFLICT(entity_type, entity_id) DO UPDATE SET
2011 deleted_at = excluded.deleted_at,
2012 deleted_by = excluded.deleted_by,
2013 exported = 0",
2014 rusqlite::params![id, path, now, ctx.actor],
2015 )?;
2016 }
2017
2018 Ok(())
2019 })
2020 }
2021
2022 pub fn get_checkpoint_items(&self, checkpoint_id: &str) -> Result<Vec<ContextItem>> {
2028 let mut stmt = self.conn.prepare(
2029 "SELECT ci.id, ci.session_id, ci.key, ci.value, ci.category, ci.priority,
2030 ci.channel, ci.tags, ci.size, ci.created_at, ci.updated_at
2031 FROM context_items ci
2032 JOIN checkpoint_items cpi ON cpi.context_item_id = ci.id
2033 WHERE cpi.checkpoint_id = ?1
2034 ORDER BY ci.priority DESC, ci.created_at DESC",
2035 )?;
2036
2037 let rows = stmt.query_map([checkpoint_id], |row| {
2038 Ok(ContextItem {
2039 id: row.get(0)?,
2040 session_id: row.get(1)?,
2041 key: row.get(2)?,
2042 value: row.get(3)?,
2043 category: row.get(4)?,
2044 priority: row.get(5)?,
2045 channel: row.get(6)?,
2046 tags: row.get(7)?,
2047 size: row.get(8)?,
2048 created_at: row.get(9)?,
2049 updated_at: row.get(10)?,
2050 })
2051 })?;
2052
2053 rows.collect::<std::result::Result<Vec<_>, _>>()
2054 .map_err(Error::from)
2055 }
2056
2057 pub fn restore_checkpoint(
2066 &mut self,
2067 checkpoint_id: &str,
2068 target_session_id: &str,
2069 restore_categories: Option<&[String]>,
2070 restore_tags: Option<&[String]>,
2071 actor: &str,
2072 ) -> Result<usize> {
2073 let mut items = self.get_checkpoint_items(checkpoint_id)?;
2075
2076 if let Some(categories) = restore_categories {
2078 items.retain(|item| categories.contains(&item.category));
2079 }
2080
2081 if let Some(tags) = restore_tags {
2083 items.retain(|item| {
2084 if let Some(ref item_tags) = item.tags {
2086 if let Ok(parsed_tags) = serde_json::from_str::<Vec<String>>(item_tags) {
2087 return tags.iter().any(|t| parsed_tags.contains(t));
2088 }
2089 }
2090 false
2091 });
2092 }
2093
2094 let now = chrono::Utc::now().timestamp_millis();
2095
2096 self.mutate("restore_checkpoint", actor, |tx, ctx| {
2097 tx.execute(
2099 "DELETE FROM context_items WHERE session_id = ?1",
2100 [target_session_id],
2101 )?;
2102
2103 let mut restored = 0;
2105 for item in &items {
2106 let new_id = uuid::Uuid::new_v4().to_string();
2107 let size = item.value.len() as i64;
2108
2109 tx.execute(
2110 "INSERT INTO context_items (id, session_id, key, value, category, priority, channel, tags, size, created_at, updated_at)
2111 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?10)",
2112 rusqlite::params![
2113 new_id,
2114 target_session_id,
2115 item.key,
2116 item.value,
2117 item.category,
2118 item.priority,
2119 item.channel,
2120 item.tags,
2121 size,
2122 now,
2123 ],
2124 )?;
2125
2126 ctx.record_event("context_item", &new_id, EventType::ItemCreated);
2127 restored += 1;
2128 }
2129
2130 Ok(restored)
2131 })
2132 }
2133
2134 pub fn remove_checkpoint_item(
2140 &mut self,
2141 checkpoint_id: &str,
2142 context_item_id: &str,
2143 actor: &str,
2144 ) -> Result<()> {
2145 self.mutate("remove_checkpoint_item", actor, |tx, _ctx| {
2146 tx.execute(
2147 "DELETE FROM checkpoint_items WHERE checkpoint_id = ?1 AND context_item_id = ?2",
2148 rusqlite::params![checkpoint_id, context_item_id],
2149 )?;
2150 Ok(())
2151 })
2152 }
2153
2154 pub fn add_checkpoint_items_by_keys(
2160 &mut self,
2161 checkpoint_id: &str,
2162 session_id: &str,
2163 keys: &[String],
2164 actor: &str,
2165 ) -> Result<usize> {
2166 let mut added = 0;
2167
2168 for key in keys {
2169 let item_id: Option<String> = self.conn.query_row(
2171 "SELECT id FROM context_items WHERE session_id = ?1 AND key = ?2",
2172 rusqlite::params![session_id, key],
2173 |row| row.get(0),
2174 ).optional()?;
2175
2176 if let Some(id) = item_id {
2177 self.add_checkpoint_item(checkpoint_id, &id, actor)?;
2178 added += 1;
2179 }
2180 }
2181
2182 Ok(added)
2183 }
2184
2185 pub fn remove_checkpoint_items_by_keys(
2191 &mut self,
2192 checkpoint_id: &str,
2193 keys: &[String],
2194 actor: &str,
2195 ) -> Result<usize> {
2196 let mut removed = 0;
2197
2198 for key in keys {
2199 let item_id: Option<String> = self.conn.query_row(
2201 "SELECT ci.id FROM context_items ci
2202 JOIN checkpoint_items cpi ON cpi.context_item_id = ci.id
2203 WHERE cpi.checkpoint_id = ?1 AND ci.key = ?2",
2204 rusqlite::params![checkpoint_id, key],
2205 |row| row.get(0),
2206 ).optional()?;
2207
2208 if let Some(id) = item_id {
2209 self.remove_checkpoint_item(checkpoint_id, &id, actor)?;
2210 removed += 1;
2211 }
2212 }
2213
2214 Ok(removed)
2215 }
2216
2217 #[allow(clippy::too_many_arguments)]
2227 pub fn save_memory(
2228 &mut self,
2229 id: &str,
2230 project_path: &str,
2231 key: &str,
2232 value: &str,
2233 category: &str,
2234 actor: &str,
2235 ) -> Result<()> {
2236 let now = chrono::Utc::now().timestamp_millis();
2237
2238 self.mutate("save_memory", actor, |tx, ctx| {
2239 tx.execute(
2240 "INSERT INTO project_memory (id, project_path, key, value, category, created_at, updated_at)
2241 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?6)
2242 ON CONFLICT(project_path, key) DO UPDATE SET
2243 value = excluded.value,
2244 category = excluded.category,
2245 updated_at = excluded.updated_at",
2246 rusqlite::params![id, project_path, key, value, category, now],
2247 )?;
2248
2249 ctx.record_event("memory", id, EventType::MemorySaved);
2250
2251 Ok(())
2252 })
2253 }
2254
2255 pub fn get_memory(&self, project_path: &str, key: &str) -> Result<Option<Memory>> {
2261 let mut stmt = self.conn.prepare(
2262 "SELECT id, project_path, key, value, category, created_at, updated_at
2263 FROM project_memory WHERE project_path = ?1 AND key = ?2",
2264 )?;
2265
2266 let memory = stmt
2267 .query_row(rusqlite::params![project_path, key], |row| {
2268 Ok(Memory {
2269 id: row.get(0)?,
2270 project_path: row.get(1)?,
2271 key: row.get(2)?,
2272 value: row.get(3)?,
2273 category: row.get(4)?,
2274 created_at: row.get(5)?,
2275 updated_at: row.get(6)?,
2276 })
2277 })
2278 .optional()?;
2279
2280 Ok(memory)
2281 }
2282
2283 pub fn list_memory(
2289 &self,
2290 project_path: &str,
2291 category: Option<&str>,
2292 ) -> Result<Vec<Memory>> {
2293 let map_row = |row: &rusqlite::Row| -> rusqlite::Result<Memory> {
2294 Ok(Memory {
2295 id: row.get(0)?,
2296 project_path: row.get(1)?,
2297 key: row.get(2)?,
2298 value: row.get(3)?,
2299 category: row.get(4)?,
2300 created_at: row.get(5)?,
2301 updated_at: row.get(6)?,
2302 })
2303 };
2304
2305 let rows = if let Some(cat) = category {
2306 let mut stmt = self.conn.prepare(
2307 "SELECT id, project_path, key, value, category, created_at, updated_at
2308 FROM project_memory WHERE project_path = ?1 AND category = ?2
2309 ORDER BY key ASC",
2310 )?;
2311 stmt.query_map(rusqlite::params![project_path, cat], map_row)?
2312 .collect::<std::result::Result<Vec<_>, _>>()
2313 } else {
2314 let mut stmt = self.conn.prepare(
2315 "SELECT id, project_path, key, value, category, created_at, updated_at
2316 FROM project_memory WHERE project_path = ?1
2317 ORDER BY key ASC",
2318 )?;
2319 stmt.query_map(rusqlite::params![project_path], map_row)?
2320 .collect::<std::result::Result<Vec<_>, _>>()
2321 };
2322
2323 rows.map_err(Error::from)
2324 }
2325
2326 pub fn delete_memory(
2332 &mut self,
2333 project_path: &str,
2334 key: &str,
2335 actor: &str,
2336 ) -> Result<()> {
2337 let proj_path = project_path.to_string();
2338 self.mutate("delete_memory", actor, |tx, ctx| {
2339 let id: Option<String> = tx
2341 .query_row(
2342 "SELECT id FROM project_memory WHERE project_path = ?1 AND key = ?2",
2343 rusqlite::params![proj_path, key],
2344 |row| row.get(0),
2345 )
2346 .optional()?;
2347
2348 let rows = tx.execute(
2349 "DELETE FROM project_memory WHERE project_path = ?1 AND key = ?2",
2350 rusqlite::params![proj_path, key],
2351 )?;
2352
2353 if rows > 0 {
2354 if let Some(ref mem_id) = id {
2355 ctx.record_event("memory", mem_id, EventType::MemoryDeleted);
2356
2357 let now = chrono::Utc::now().timestamp_millis();
2359 tx.execute(
2360 "INSERT INTO sync_deletions (entity_type, entity_id, project_path, deleted_at, deleted_by, exported)
2361 VALUES ('memory', ?1, ?2, ?3, ?4, 0)
2362 ON CONFLICT(entity_type, entity_id) DO UPDATE SET
2363 deleted_at = excluded.deleted_at,
2364 deleted_by = excluded.deleted_by,
2365 exported = 0",
2366 rusqlite::params![mem_id, proj_path, now, ctx.actor],
2367 )?;
2368 }
2369 }
2370
2371 Ok(())
2372 })
2373 }
2374
2375 pub fn get_dirty_sessions(&self) -> Result<Vec<String>> {
2385 let mut stmt = self.conn.prepare(
2386 "SELECT session_id FROM dirty_sessions ORDER BY marked_at ASC",
2387 )?;
2388 let rows = stmt.query_map([], |row| row.get(0))?;
2389 rows.collect::<std::result::Result<Vec<_>, _>>()
2390 .map_err(Error::from)
2391 }
2392
2393 pub fn get_dirty_issues(&self) -> Result<Vec<String>> {
2399 let mut stmt = self.conn.prepare(
2400 "SELECT issue_id FROM dirty_issues ORDER BY marked_at ASC",
2401 )?;
2402 let rows = stmt.query_map([], |row| row.get(0))?;
2403 rows.collect::<std::result::Result<Vec<_>, _>>()
2404 .map_err(Error::from)
2405 }
2406
2407 pub fn get_dirty_context_items(&self) -> Result<Vec<String>> {
2413 let mut stmt = self.conn.prepare(
2414 "SELECT item_id FROM dirty_context_items ORDER BY marked_at ASC",
2415 )?;
2416 let rows = stmt.query_map([], |row| row.get(0))?;
2417 rows.collect::<std::result::Result<Vec<_>, _>>()
2418 .map_err(Error::from)
2419 }
2420
2421 pub fn clear_dirty_sessions(&mut self, ids: &[String]) -> Result<()> {
2427 if ids.is_empty() {
2428 return Ok(());
2429 }
2430 let placeholders = vec!["?"; ids.len()].join(",");
2431 let sql = format!("DELETE FROM dirty_sessions WHERE session_id IN ({placeholders})");
2432 let params: Vec<&dyn rusqlite::ToSql> = ids.iter().map(|s| s as &dyn rusqlite::ToSql).collect();
2433 self.conn.execute(&sql, params.as_slice())?;
2434 Ok(())
2435 }
2436
2437 pub fn clear_dirty_issues(&mut self, ids: &[String]) -> Result<()> {
2443 if ids.is_empty() {
2444 return Ok(());
2445 }
2446 let placeholders = vec!["?"; ids.len()].join(",");
2447 let sql = format!("DELETE FROM dirty_issues WHERE issue_id IN ({placeholders})");
2448 let params: Vec<&dyn rusqlite::ToSql> = ids.iter().map(|s| s as &dyn rusqlite::ToSql).collect();
2449 self.conn.execute(&sql, params.as_slice())?;
2450 Ok(())
2451 }
2452
2453 pub fn clear_dirty_context_items(&mut self, ids: &[String]) -> Result<()> {
2459 if ids.is_empty() {
2460 return Ok(());
2461 }
2462 let placeholders = vec!["?"; ids.len()].join(",");
2463 let sql = format!("DELETE FROM dirty_context_items WHERE item_id IN ({placeholders})");
2464 let params: Vec<&dyn rusqlite::ToSql> = ids.iter().map(|s| s as &dyn rusqlite::ToSql).collect();
2465 self.conn.execute(&sql, params.as_slice())?;
2466 Ok(())
2467 }
2468
2469 pub fn get_export_hash(&self, entity_type: &str, entity_id: &str) -> Result<Option<String>> {
2475 let mut stmt = self.conn.prepare(
2476 "SELECT content_hash FROM export_hashes WHERE entity_type = ?1 AND entity_id = ?2",
2477 )?;
2478 let hash = stmt
2479 .query_row(rusqlite::params![entity_type, entity_id], |row| row.get(0))
2480 .optional()?;
2481 Ok(hash)
2482 }
2483
2484 pub fn set_export_hash(&mut self, entity_type: &str, entity_id: &str, hash: &str) -> Result<()> {
2490 let now = chrono::Utc::now().timestamp_millis();
2491 self.conn.execute(
2492 "INSERT INTO export_hashes (entity_type, entity_id, content_hash, exported_at)
2493 VALUES (?1, ?2, ?3, ?4)
2494 ON CONFLICT(entity_type, entity_id) DO UPDATE SET
2495 content_hash = excluded.content_hash,
2496 exported_at = excluded.exported_at",
2497 rusqlite::params![entity_type, entity_id, hash, now],
2498 )?;
2499 Ok(())
2500 }
2501
2502 pub fn record_deletion(
2515 &mut self,
2516 entity_type: &str,
2517 entity_id: &str,
2518 project_path: &str,
2519 actor: &str,
2520 ) -> Result<()> {
2521 let now = chrono::Utc::now().timestamp_millis();
2522 self.conn.execute(
2523 "INSERT INTO sync_deletions (entity_type, entity_id, project_path, deleted_at, deleted_by, exported)
2524 VALUES (?1, ?2, ?3, ?4, ?5, 0)
2525 ON CONFLICT(entity_type, entity_id) DO UPDATE SET
2526 deleted_at = excluded.deleted_at,
2527 deleted_by = excluded.deleted_by,
2528 exported = 0",
2529 rusqlite::params![entity_type, entity_id, project_path, now, actor],
2530 )?;
2531 Ok(())
2532 }
2533
2534 pub fn get_pending_deletions(&self, project_path: &str) -> Result<Vec<SyncDeletion>> {
2540 let mut stmt = self.conn.prepare(
2541 "SELECT id, entity_type, entity_id, project_path, deleted_at, deleted_by
2542 FROM sync_deletions
2543 WHERE project_path = ?1 AND exported = 0
2544 ORDER BY deleted_at ASC",
2545 )?;
2546 let rows = stmt.query_map([project_path], |row| {
2547 Ok(SyncDeletion {
2548 id: row.get(0)?,
2549 entity_type: row.get(1)?,
2550 entity_id: row.get(2)?,
2551 project_path: row.get(3)?,
2552 deleted_at: row.get(4)?,
2553 deleted_by: row.get(5)?,
2554 })
2555 })?;
2556 rows.collect::<std::result::Result<Vec<_>, _>>()
2557 .map_err(Error::from)
2558 }
2559
2560 pub fn get_all_deletions(&self, project_path: &str) -> Result<Vec<SyncDeletion>> {
2566 let mut stmt = self.conn.prepare(
2567 "SELECT id, entity_type, entity_id, project_path, deleted_at, deleted_by
2568 FROM sync_deletions
2569 WHERE project_path = ?1
2570 ORDER BY deleted_at ASC",
2571 )?;
2572 let rows = stmt.query_map([project_path], |row| {
2573 Ok(SyncDeletion {
2574 id: row.get(0)?,
2575 entity_type: row.get(1)?,
2576 entity_id: row.get(2)?,
2577 project_path: row.get(3)?,
2578 deleted_at: row.get(4)?,
2579 deleted_by: row.get(5)?,
2580 })
2581 })?;
2582 rows.collect::<std::result::Result<Vec<_>, _>>()
2583 .map_err(Error::from)
2584 }
2585
2586 pub fn mark_deletions_exported(&mut self, ids: &[i64]) -> Result<()> {
2592 if ids.is_empty() {
2593 return Ok(());
2594 }
2595 let placeholders = vec!["?"; ids.len()].join(",");
2596 let sql = format!("UPDATE sync_deletions SET exported = 1 WHERE id IN ({placeholders})");
2597 let params: Vec<&dyn rusqlite::ToSql> = ids.iter().map(|s| s as &dyn rusqlite::ToSql).collect();
2598 self.conn.execute(&sql, params.as_slice())?;
2599 Ok(())
2600 }
2601
2602 pub fn count_pending_deletions(&self, project_path: &str) -> Result<usize> {
2608 let count: i64 = self.conn.query_row(
2609 "SELECT COUNT(*) FROM sync_deletions WHERE project_path = ?1 AND exported = 0",
2610 [project_path],
2611 |row| row.get(0),
2612 )?;
2613 Ok(count as usize)
2614 }
2615
2616 pub fn apply_deletion(&mut self, entity_type: &str, entity_id: &str) -> Result<bool> {
2622 let sql = match entity_type {
2623 "session" => "DELETE FROM sessions WHERE id = ?1",
2624 "issue" => "DELETE FROM issues WHERE id = ?1",
2625 "context_item" => "DELETE FROM context_items WHERE id = ?1",
2626 "memory" => "DELETE FROM project_memory WHERE id = ?1",
2627 "checkpoint" => "DELETE FROM checkpoints WHERE id = ?1",
2628 _ => return Ok(false),
2629 };
2630 let rows = self.conn.execute(sql, [entity_id])?;
2631 Ok(rows > 0)
2632 }
2633
2634 pub fn get_all_sessions(&self) -> Result<Vec<Session>> {
2640 let mut stmt = self.conn.prepare(
2641 "SELECT id, name, description, branch, channel, project_path, status, ended_at, created_at, updated_at
2642 FROM sessions ORDER BY created_at ASC",
2643 )?;
2644 let rows = stmt.query_map([], |row| {
2645 Ok(Session {
2646 id: row.get(0)?,
2647 name: row.get(1)?,
2648 description: row.get(2)?,
2649 branch: row.get(3)?,
2650 channel: row.get(4)?,
2651 project_path: row.get(5)?,
2652 status: row.get(6)?,
2653 ended_at: row.get(7)?,
2654 created_at: row.get(8)?,
2655 updated_at: row.get(9)?,
2656 })
2657 })?;
2658 rows.collect::<std::result::Result<Vec<_>, _>>()
2659 .map_err(Error::from)
2660 }
2661
2662 pub fn get_all_issues(&self) -> Result<Vec<Issue>> {
2668 let mut stmt = self.conn.prepare(
2669 "SELECT id, short_id, project_path, title, description, details, status, priority, issue_type, plan_id, created_by_agent, assigned_to_agent, created_at, updated_at, closed_at
2670 FROM issues ORDER BY created_at ASC",
2671 )?;
2672 let rows = stmt.query_map([], map_issue_row)?;
2673 rows.collect::<std::result::Result<Vec<_>, _>>()
2674 .map_err(Error::from)
2675 }
2676
2677 pub fn get_all_context_items(
2683 &self,
2684 category: Option<&str>,
2685 priority: Option<&str>,
2686 limit: Option<u32>,
2687 ) -> Result<Vec<ContextItem>> {
2688 let mut sql = String::from(
2689 "SELECT id, session_id, key, value, category, priority, channel, tags, size, created_at, updated_at
2690 FROM context_items WHERE 1=1",
2691 );
2692
2693 let mut params: Vec<Box<dyn rusqlite::ToSql>> = vec![];
2694
2695 if let Some(cat) = category {
2696 sql.push_str(" AND category = ?");
2697 params.push(Box::new(cat.to_string()));
2698 }
2699
2700 if let Some(pri) = priority {
2701 sql.push_str(" AND priority = ?");
2702 params.push(Box::new(pri.to_string()));
2703 }
2704
2705 sql.push_str(" ORDER BY created_at DESC");
2706 if let Some(lim) = limit {
2707 sql.push_str(" LIMIT ?");
2708 params.push(Box::new(lim));
2709 }
2710
2711 let mut stmt = self.conn.prepare(&sql)?;
2712 let params_refs: Vec<&dyn rusqlite::ToSql> = params.iter().map(|b| b.as_ref()).collect();
2713
2714 let rows = stmt.query_map(params_refs.as_slice(), |row| {
2715 Ok(ContextItem {
2716 id: row.get(0)?,
2717 session_id: row.get(1)?,
2718 key: row.get(2)?,
2719 value: row.get(3)?,
2720 category: row.get(4)?,
2721 priority: row.get(5)?,
2722 channel: row.get(6)?,
2723 tags: row.get(7)?,
2724 size: row.get(8)?,
2725 created_at: row.get(9)?,
2726 updated_at: row.get(10)?,
2727 })
2728 })?;
2729 rows.collect::<std::result::Result<Vec<_>, _>>()
2730 .map_err(Error::from)
2731 }
2732
2733 pub fn get_all_memory(&self) -> Result<Vec<Memory>> {
2739 let mut stmt = self.conn.prepare(
2740 "SELECT id, project_path, key, value, category, created_at, updated_at
2741 FROM project_memory ORDER BY created_at ASC",
2742 )?;
2743 let rows = stmt.query_map([], |row| {
2744 Ok(Memory {
2745 id: row.get(0)?,
2746 project_path: row.get(1)?,
2747 key: row.get(2)?,
2748 value: row.get(3)?,
2749 category: row.get(4)?,
2750 created_at: row.get(5)?,
2751 updated_at: row.get(6)?,
2752 })
2753 })?;
2754 rows.collect::<std::result::Result<Vec<_>, _>>()
2755 .map_err(Error::from)
2756 }
2757
2758 pub fn get_all_issue_short_ids(&self) -> Result<Vec<String>> {
2763 let mut stmt = self
2764 .conn
2765 .prepare("SELECT short_id FROM issues WHERE short_id IS NOT NULL")?;
2766 let rows = stmt.query_map([], |row| row.get::<_, String>(0))?;
2767 rows.collect::<std::result::Result<Vec<_>, _>>()
2768 .map_err(Error::from)
2769 }
2770
2771 pub fn get_all_session_ids(&self) -> Result<Vec<String>> {
2773 let mut stmt = self.conn.prepare("SELECT id FROM sessions")?;
2774 let rows = stmt.query_map([], |row| row.get::<_, String>(0))?;
2775 rows.collect::<std::result::Result<Vec<_>, _>>()
2776 .map_err(Error::from)
2777 }
2778
2779 pub fn get_all_checkpoint_ids(&self) -> Result<Vec<String>> {
2781 let mut stmt = self.conn.prepare("SELECT id FROM checkpoints")?;
2782 let rows = stmt.query_map([], |row| row.get::<_, String>(0))?;
2783 rows.collect::<std::result::Result<Vec<_>, _>>()
2784 .map_err(Error::from)
2785 }
2786
2787 pub fn get_all_checkpoints(&self) -> Result<Vec<Checkpoint>> {
2793 let mut stmt = self.conn.prepare(
2794 "SELECT c.id, c.session_id, c.name, c.description, c.git_status, c.git_branch, c.created_at,
2795 (SELECT COUNT(*) FROM checkpoint_items ci WHERE ci.checkpoint_id = c.id) as item_count
2796 FROM checkpoints c ORDER BY c.created_at ASC",
2797 )?;
2798 let rows = stmt.query_map([], |row| {
2799 Ok(Checkpoint {
2800 id: row.get(0)?,
2801 session_id: row.get(1)?,
2802 name: row.get(2)?,
2803 description: row.get(3)?,
2804 git_status: row.get(4)?,
2805 git_branch: row.get(5)?,
2806 created_at: row.get(6)?,
2807 item_count: row.get(7)?,
2808 })
2809 })?;
2810 rows.collect::<std::result::Result<Vec<_>, _>>()
2811 .map_err(Error::from)
2812 }
2813
2814 pub fn get_context_item(&self, id: &str) -> Result<Option<ContextItem>> {
2820 let mut stmt = self.conn.prepare(
2821 "SELECT id, session_id, key, value, category, priority, channel, tags, size, created_at, updated_at
2822 FROM context_items WHERE id = ?1",
2823 )?;
2824 let item = stmt
2825 .query_row([id], |row| {
2826 Ok(ContextItem {
2827 id: row.get(0)?,
2828 session_id: row.get(1)?,
2829 key: row.get(2)?,
2830 value: row.get(3)?,
2831 category: row.get(4)?,
2832 priority: row.get(5)?,
2833 channel: row.get(6)?,
2834 tags: row.get(7)?,
2835 size: row.get(8)?,
2836 created_at: row.get(9)?,
2837 updated_at: row.get(10)?,
2838 })
2839 })
2840 .optional()?;
2841 Ok(item)
2842 }
2843
2844 pub fn get_sessions_by_project(&self, project_path: &str) -> Result<Vec<Session>> {
2854 let mut stmt = self.conn.prepare(
2855 "SELECT id, name, description, branch, channel, project_path, status, ended_at, created_at, updated_at
2856 FROM sessions WHERE project_path = ?1 ORDER BY created_at ASC",
2857 )?;
2858 let rows = stmt.query_map([project_path], |row| {
2859 Ok(Session {
2860 id: row.get(0)?,
2861 name: row.get(1)?,
2862 description: row.get(2)?,
2863 branch: row.get(3)?,
2864 channel: row.get(4)?,
2865 project_path: row.get(5)?,
2866 status: row.get(6)?,
2867 ended_at: row.get(7)?,
2868 created_at: row.get(8)?,
2869 updated_at: row.get(9)?,
2870 })
2871 })?;
2872 rows.collect::<std::result::Result<Vec<_>, _>>()
2873 .map_err(Error::from)
2874 }
2875
2876 pub fn get_issues_by_project(&self, project_path: &str) -> Result<Vec<Issue>> {
2882 let mut stmt = self.conn.prepare(
2883 "SELECT id, short_id, project_path, title, description, details, status, priority, issue_type, plan_id, created_by_agent, assigned_to_agent, created_at, updated_at, closed_at
2884 FROM issues WHERE project_path = ?1 ORDER BY created_at ASC",
2885 )?;
2886 let rows = stmt.query_map([project_path], map_issue_row)?;
2887 rows.collect::<std::result::Result<Vec<_>, _>>()
2888 .map_err(Error::from)
2889 }
2890
2891 pub fn get_context_items_by_project(&self, project_path: &str) -> Result<Vec<ContextItem>> {
2900 let mut stmt = self.conn.prepare(
2901 "SELECT ci.id, ci.session_id, ci.key, ci.value, ci.category, ci.priority, ci.channel, ci.tags, ci.size, ci.created_at, ci.updated_at
2902 FROM context_items ci
2903 INNER JOIN sessions s ON ci.session_id = s.id
2904 WHERE s.project_path = ?1
2905 ORDER BY ci.created_at ASC",
2906 )?;
2907 let rows = stmt.query_map([project_path], |row| {
2908 Ok(ContextItem {
2909 id: row.get(0)?,
2910 session_id: row.get(1)?,
2911 key: row.get(2)?,
2912 value: row.get(3)?,
2913 category: row.get(4)?,
2914 priority: row.get(5)?,
2915 channel: row.get(6)?,
2916 tags: row.get(7)?,
2917 size: row.get(8)?,
2918 created_at: row.get(9)?,
2919 updated_at: row.get(10)?,
2920 })
2921 })?;
2922 rows.collect::<std::result::Result<Vec<_>, _>>()
2923 .map_err(Error::from)
2924 }
2925
2926 pub fn get_memory_by_project(&self, project_path: &str) -> Result<Vec<Memory>> {
2932 let mut stmt = self.conn.prepare(
2933 "SELECT id, project_path, key, value, category, created_at, updated_at
2934 FROM project_memory WHERE project_path = ?1 ORDER BY created_at ASC",
2935 )?;
2936 let rows = stmt.query_map([project_path], |row| {
2937 Ok(Memory {
2938 id: row.get(0)?,
2939 project_path: row.get(1)?,
2940 key: row.get(2)?,
2941 value: row.get(3)?,
2942 category: row.get(4)?,
2943 created_at: row.get(5)?,
2944 updated_at: row.get(6)?,
2945 })
2946 })?;
2947 rows.collect::<std::result::Result<Vec<_>, _>>()
2948 .map_err(Error::from)
2949 }
2950
2951 pub fn get_checkpoints_by_project(&self, project_path: &str) -> Result<Vec<Checkpoint>> {
2960 let mut stmt = self.conn.prepare(
2961 "SELECT c.id, c.session_id, c.name, c.description, c.git_status, c.git_branch, c.created_at,
2962 (SELECT COUNT(*) FROM checkpoint_items ci WHERE ci.checkpoint_id = c.id) as item_count
2963 FROM checkpoints c
2964 INNER JOIN sessions s ON c.session_id = s.id
2965 WHERE s.project_path = ?1
2966 ORDER BY c.created_at ASC",
2967 )?;
2968 let rows = stmt.query_map([project_path], |row| {
2969 Ok(Checkpoint {
2970 id: row.get(0)?,
2971 session_id: row.get(1)?,
2972 name: row.get(2)?,
2973 description: row.get(3)?,
2974 git_status: row.get(4)?,
2975 git_branch: row.get(5)?,
2976 created_at: row.get(6)?,
2977 item_count: row.get(7)?,
2978 })
2979 })?;
2980 rows.collect::<std::result::Result<Vec<_>, _>>()
2981 .map_err(Error::from)
2982 }
2983
2984 pub fn get_dirty_sessions_by_project(&self, project_path: &str) -> Result<Vec<String>> {
2990 let mut stmt = self.conn.prepare(
2991 "SELECT ds.session_id
2992 FROM dirty_sessions ds
2993 INNER JOIN sessions s ON ds.session_id = s.id
2994 WHERE s.project_path = ?1",
2995 )?;
2996 let rows = stmt.query_map([project_path], |row| row.get(0))?;
2997 rows.collect::<std::result::Result<Vec<_>, _>>()
2998 .map_err(Error::from)
2999 }
3000
3001 pub fn get_dirty_issues_by_project(&self, project_path: &str) -> Result<Vec<String>> {
3007 let mut stmt = self.conn.prepare(
3008 "SELECT di.issue_id
3009 FROM dirty_issues di
3010 INNER JOIN issues i ON di.issue_id = i.id
3011 WHERE i.project_path = ?1",
3012 )?;
3013 let rows = stmt.query_map([project_path], |row| row.get(0))?;
3014 rows.collect::<std::result::Result<Vec<_>, _>>()
3015 .map_err(Error::from)
3016 }
3017
3018 pub fn get_dirty_context_items_by_project(&self, project_path: &str) -> Result<Vec<String>> {
3024 let mut stmt = self.conn.prepare(
3025 "SELECT dci.item_id
3026 FROM dirty_context_items dci
3027 INNER JOIN context_items ci ON dci.item_id = ci.id
3028 INNER JOIN sessions s ON ci.session_id = s.id
3029 WHERE s.project_path = ?1",
3030 )?;
3031 let rows = stmt.query_map([project_path], |row| row.get(0))?;
3032 rows.collect::<std::result::Result<Vec<_>, _>>()
3033 .map_err(Error::from)
3034 }
3035
3036 pub fn backfill_dirty_for_project(&mut self, project_path: &str) -> Result<BackfillStats> {
3046 let now = chrono::Utc::now().timestamp_millis();
3047
3048 let sessions_count = self.conn.execute(
3050 "INSERT OR IGNORE INTO dirty_sessions (session_id, marked_at)
3051 SELECT id, ?1 FROM sessions WHERE project_path = ?2",
3052 rusqlite::params![now, project_path],
3053 )?;
3054
3055 let issues_count = self.conn.execute(
3057 "INSERT OR IGNORE INTO dirty_issues (issue_id, marked_at)
3058 SELECT id, ?1 FROM issues WHERE project_path = ?2",
3059 rusqlite::params![now, project_path],
3060 )?;
3061
3062 let context_items_count = self.conn.execute(
3064 "INSERT OR IGNORE INTO dirty_context_items (item_id, marked_at)
3065 SELECT ci.id, ?1 FROM context_items ci
3066 INNER JOIN sessions s ON ci.session_id = s.id
3067 WHERE s.project_path = ?2",
3068 rusqlite::params![now, project_path],
3069 )?;
3070
3071 Ok(BackfillStats {
3072 sessions: sessions_count,
3073 issues: issues_count,
3074 context_items: context_items_count,
3075 })
3076 }
3077
3078 pub fn get_project_counts(&self, project_path: &str) -> Result<ProjectCounts> {
3084 let sessions: i64 = self.conn.query_row(
3085 "SELECT COUNT(*) FROM sessions WHERE project_path = ?1",
3086 [project_path],
3087 |row| row.get(0),
3088 )?;
3089
3090 let issues: i64 = self.conn.query_row(
3091 "SELECT COUNT(*) FROM issues WHERE project_path = ?1",
3092 [project_path],
3093 |row| row.get(0),
3094 )?;
3095
3096 let context_items: i64 = self.conn.query_row(
3097 "SELECT COUNT(*) FROM context_items ci
3098 INNER JOIN sessions s ON ci.session_id = s.id
3099 WHERE s.project_path = ?1",
3100 [project_path],
3101 |row| row.get(0),
3102 )?;
3103
3104 let memories: i64 = self.conn.query_row(
3105 "SELECT COUNT(*) FROM project_memory WHERE project_path = ?1",
3106 [project_path],
3107 |row| row.get(0),
3108 )?;
3109
3110 let checkpoints: i64 = self.conn.query_row(
3111 "SELECT COUNT(*) FROM checkpoints c
3112 INNER JOIN sessions s ON c.session_id = s.id
3113 WHERE s.project_path = ?1",
3114 [project_path],
3115 |row| row.get(0),
3116 )?;
3117
3118 Ok(ProjectCounts {
3119 sessions: sessions as usize,
3120 issues: issues as usize,
3121 context_items: context_items as usize,
3122 memories: memories as usize,
3123 checkpoints: checkpoints as usize,
3124 })
3125 }
3126
3127 pub fn upsert_session(&mut self, session: &Session) -> Result<()> {
3139 self.conn.execute(
3140 "INSERT INTO sessions (id, name, description, branch, channel, project_path, status, ended_at, created_at, updated_at)
3141 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10)
3142 ON CONFLICT(id) DO UPDATE SET
3143 name = excluded.name,
3144 description = excluded.description,
3145 branch = excluded.branch,
3146 channel = excluded.channel,
3147 project_path = excluded.project_path,
3148 status = excluded.status,
3149 ended_at = excluded.ended_at,
3150 updated_at = excluded.updated_at",
3151 rusqlite::params![
3152 session.id,
3153 session.name,
3154 session.description,
3155 session.branch,
3156 session.channel,
3157 session.project_path,
3158 session.status,
3159 session.ended_at,
3160 session.created_at,
3161 session.updated_at,
3162 ],
3163 )?;
3164 Ok(())
3165 }
3166
3167 pub fn upsert_issue(&mut self, issue: &Issue) -> Result<()> {
3173 self.conn.execute(
3174 "INSERT INTO issues (id, short_id, project_path, title, description, details, status, priority, issue_type, plan_id, created_by_agent, assigned_to_agent, created_at, updated_at, closed_at)
3175 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13, ?14, ?15)
3176 ON CONFLICT(id) DO UPDATE SET
3177 short_id = excluded.short_id,
3178 project_path = excluded.project_path,
3179 title = excluded.title,
3180 description = excluded.description,
3181 details = excluded.details,
3182 status = excluded.status,
3183 priority = excluded.priority,
3184 issue_type = excluded.issue_type,
3185 plan_id = excluded.plan_id,
3186 assigned_to_agent = excluded.assigned_to_agent,
3187 updated_at = excluded.updated_at,
3188 closed_at = excluded.closed_at",
3189 rusqlite::params![
3190 issue.id,
3191 issue.short_id,
3192 issue.project_path,
3193 issue.title,
3194 issue.description,
3195 issue.details,
3196 issue.status,
3197 issue.priority,
3198 issue.issue_type,
3199 issue.plan_id,
3200 issue.created_by_agent,
3201 issue.assigned_to_agent,
3202 issue.created_at,
3203 issue.updated_at,
3204 issue.closed_at,
3205 ],
3206 )?;
3207 Ok(())
3208 }
3209
3210 pub fn upsert_context_item(&mut self, item: &ContextItem) -> Result<()> {
3216 self.conn.execute(
3217 "INSERT INTO context_items (id, session_id, key, value, category, priority, channel, tags, size, created_at, updated_at)
3218 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11)
3219 ON CONFLICT(id) DO UPDATE SET
3220 key = excluded.key,
3221 value = excluded.value,
3222 category = excluded.category,
3223 priority = excluded.priority,
3224 channel = excluded.channel,
3225 tags = excluded.tags,
3226 size = excluded.size,
3227 updated_at = excluded.updated_at",
3228 rusqlite::params![
3229 item.id,
3230 item.session_id,
3231 item.key,
3232 item.value,
3233 item.category,
3234 item.priority,
3235 item.channel,
3236 item.tags,
3237 item.size,
3238 item.created_at,
3239 item.updated_at,
3240 ],
3241 )?;
3242 Ok(())
3243 }
3244
3245 pub fn upsert_memory(&mut self, memory: &Memory) -> Result<()> {
3251 self.conn.execute(
3252 "INSERT INTO project_memory (id, project_path, key, value, category, created_at, updated_at)
3253 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)
3254 ON CONFLICT(id) DO UPDATE SET
3255 key = excluded.key,
3256 value = excluded.value,
3257 category = excluded.category,
3258 updated_at = excluded.updated_at",
3259 rusqlite::params![
3260 memory.id,
3261 memory.project_path,
3262 memory.key,
3263 memory.value,
3264 memory.category,
3265 memory.created_at,
3266 memory.updated_at,
3267 ],
3268 )?;
3269 Ok(())
3270 }
3271
3272 pub fn upsert_checkpoint(&mut self, checkpoint: &Checkpoint) -> Result<()> {
3280 self.conn.execute(
3281 "INSERT INTO checkpoints (id, session_id, name, description, git_status, git_branch, created_at)
3282 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)
3283 ON CONFLICT(id) DO UPDATE SET
3284 name = excluded.name,
3285 description = excluded.description,
3286 git_status = excluded.git_status,
3287 git_branch = excluded.git_branch",
3288 rusqlite::params![
3289 checkpoint.id,
3290 checkpoint.session_id,
3291 checkpoint.name,
3292 checkpoint.description,
3293 checkpoint.git_status,
3294 checkpoint.git_branch,
3295 checkpoint.created_at,
3296 ],
3297 )?;
3298 Ok(())
3299 }
3300
3301 pub fn create_project(&mut self, project: &Project, actor: &str) -> Result<()> {
3311 self.mutate("create_project", actor, |tx, ctx| {
3312 tx.execute(
3313 "INSERT INTO projects (id, project_path, name, description, issue_prefix, next_issue_number, plan_prefix, next_plan_number, created_at, updated_at)
3314 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10)",
3315 rusqlite::params![
3316 project.id,
3317 project.project_path,
3318 project.name,
3319 project.description,
3320 project.issue_prefix,
3321 project.next_issue_number,
3322 project.plan_prefix,
3323 project.next_plan_number,
3324 project.created_at,
3325 project.updated_at,
3326 ],
3327 )?;
3328
3329 ctx.record_event("project", &project.id, EventType::ProjectCreated);
3330 Ok(())
3331 })
3332 }
3333
3334 pub fn get_project(&self, id: &str) -> Result<Option<Project>> {
3340 let project = self
3341 .conn
3342 .query_row(
3343 "SELECT id, project_path, name, description, issue_prefix, next_issue_number, plan_prefix, next_plan_number, created_at, updated_at
3344 FROM projects WHERE id = ?1",
3345 [id],
3346 map_project_row,
3347 )
3348 .optional()?;
3349 Ok(project)
3350 }
3351
3352 pub fn get_project_by_path(&self, project_path: &str) -> Result<Option<Project>> {
3358 let project = self
3359 .conn
3360 .query_row(
3361 "SELECT id, project_path, name, description, issue_prefix, next_issue_number, plan_prefix, next_plan_number, created_at, updated_at
3362 FROM projects WHERE project_path = ?1",
3363 [project_path],
3364 map_project_row,
3365 )
3366 .optional()?;
3367 Ok(project)
3368 }
3369
3370 pub fn list_projects(&self, limit: usize) -> Result<Vec<Project>> {
3376 let mut stmt = self.conn.prepare(
3377 "SELECT id, project_path, name, description, issue_prefix, next_issue_number, plan_prefix, next_plan_number, created_at, updated_at
3378 FROM projects
3379 ORDER BY updated_at DESC
3380 LIMIT ?1",
3381 )?;
3382
3383 let projects = stmt
3384 .query_map([limit], map_project_row)?
3385 .collect::<std::result::Result<Vec<_>, _>>()?;
3386
3387 Ok(projects)
3388 }
3389
3390 pub fn update_project(
3396 &mut self,
3397 id: &str,
3398 name: Option<&str>,
3399 description: Option<&str>,
3400 issue_prefix: Option<&str>,
3401 actor: &str,
3402 ) -> Result<()> {
3403 self.mutate("update_project", actor, |tx, ctx| {
3404 let now = chrono::Utc::now().timestamp_millis();
3405
3406 let mut updates = vec!["updated_at = ?1"];
3408 let mut params: Vec<Box<dyn rusqlite::ToSql>> = vec![Box::new(now)];
3409 let mut param_idx = 2;
3410
3411 if let Some(n) = name {
3412 updates.push(format!("name = ?{param_idx}").leak());
3413 params.push(Box::new(n.to_string()));
3414 param_idx += 1;
3415 }
3416
3417 if let Some(d) = description {
3418 updates.push(format!("description = ?{param_idx}").leak());
3419 params.push(Box::new(d.to_string()));
3420 param_idx += 1;
3421 }
3422
3423 if let Some(p) = issue_prefix {
3424 updates.push(format!("issue_prefix = ?{param_idx}").leak());
3425 params.push(Box::new(p.to_string()));
3426 param_idx += 1;
3427 }
3428
3429 params.push(Box::new(id.to_string()));
3431
3432 let sql = format!(
3433 "UPDATE projects SET {} WHERE id = ?{}",
3434 updates.join(", "),
3435 param_idx
3436 );
3437
3438 let param_refs: Vec<&dyn rusqlite::ToSql> = params.iter().map(|p| p.as_ref()).collect();
3439 let affected = tx.execute(&sql, param_refs.as_slice())?;
3440
3441 if affected == 0 {
3442 return Err(Error::ProjectNotFound { id: id.to_string() });
3443 }
3444
3445 ctx.record_event("project", id, EventType::ProjectUpdated);
3446 Ok(())
3447 })
3448 }
3449
3450 pub fn delete_project(&mut self, id: &str, actor: &str) -> Result<()> {
3462 self.mutate("delete_project", actor, |tx, ctx| {
3463 let project_path: Option<String> = tx
3465 .query_row(
3466 "SELECT project_path FROM projects WHERE id = ?1",
3467 [id],
3468 |row| row.get(0),
3469 )
3470 .optional()?;
3471
3472 let project_path = project_path.ok_or_else(|| Error::ProjectNotFound { id: id.to_string() })?;
3473
3474 tx.execute(
3476 "DELETE FROM sessions WHERE project_path = ?1",
3477 [&project_path],
3478 )?;
3479
3480 tx.execute(
3482 "DELETE FROM issues WHERE project_path = ?1",
3483 [&project_path],
3484 )?;
3485
3486 tx.execute(
3488 "DELETE FROM plans WHERE project_path = ?1",
3489 [&project_path],
3490 )?;
3491
3492 tx.execute(
3494 "DELETE FROM project_memory WHERE project_path = ?1",
3495 [&project_path],
3496 )?;
3497
3498 let affected = tx.execute("DELETE FROM projects WHERE id = ?1", [id])?;
3500
3501 if affected == 0 {
3502 return Err(Error::ProjectNotFound { id: id.to_string() });
3503 }
3504
3505 ctx.record_event("project", id, EventType::ProjectDeleted);
3506 Ok(())
3507 })
3508 }
3509
3510 pub fn get_or_create_project(&mut self, project_path: &str, actor: &str) -> Result<Project> {
3519 if let Some(project) = self.get_project_by_path(project_path)? {
3521 return Ok(project);
3522 }
3523
3524 let name = std::path::Path::new(project_path)
3526 .file_name()
3527 .and_then(|n| n.to_str())
3528 .unwrap_or("Unknown Project")
3529 .to_string();
3530
3531 let project = Project::new(project_path.to_string(), name);
3532 self.create_project(&project, actor)?;
3533 Ok(project)
3534 }
3535
3536 pub fn get_next_issue_number(&mut self, project_path: &str) -> Result<i32> {
3542 let project = self
3543 .get_project_by_path(project_path)?
3544 .ok_or_else(|| Error::ProjectNotFound { id: project_path.to_string() })?;
3545
3546 let next_num = project.next_issue_number;
3547
3548 self.conn.execute(
3550 "UPDATE projects SET next_issue_number = next_issue_number + 1, updated_at = ?1 WHERE project_path = ?2",
3551 rusqlite::params![chrono::Utc::now().timestamp_millis(), project_path],
3552 )?;
3553
3554 Ok(next_num)
3555 }
3556
3557 pub fn create_plan(&mut self, plan: &Plan, actor: &str) -> Result<()> {
3567 self.mutate("create_plan", actor, |tx, ctx| {
3568 tx.execute(
3569 "INSERT INTO plans (id, short_id, project_id, project_path, title, content, status, success_criteria, created_in_session, created_at, updated_at)
3570 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11)",
3571 rusqlite::params![
3572 plan.id,
3573 plan.short_id,
3574 plan.project_id,
3575 plan.project_path,
3576 plan.title,
3577 plan.content,
3578 plan.status.as_str(),
3579 plan.success_criteria,
3580 plan.created_in_session,
3581 plan.created_at,
3582 plan.updated_at,
3583 ],
3584 )?;
3585
3586 ctx.record_event("plan", &plan.id, EventType::PlanCreated);
3587 Ok(())
3588 })
3589 }
3590
3591 pub fn get_plan(&self, id: &str) -> Result<Option<Plan>> {
3597 let plan = self
3598 .conn
3599 .query_row(
3600 "SELECT id, short_id, project_id, project_path, title, content, status, success_criteria, created_in_session, completed_in_session, created_at, updated_at, completed_at
3601 FROM plans WHERE id = ?1",
3602 [id],
3603 map_plan_row,
3604 )
3605 .optional()?;
3606 Ok(plan)
3607 }
3608
3609 pub fn list_plans(&self, project_path: &str, status: Option<&str>, limit: usize) -> Result<Vec<Plan>> {
3615 let sql = if let Some(status) = status {
3616 if status == "all" {
3617 "SELECT id, short_id, project_id, project_path, title, content, status, success_criteria, created_in_session, completed_in_session, created_at, updated_at, completed_at
3618 FROM plans WHERE project_path = ?1
3619 ORDER BY updated_at DESC
3620 LIMIT ?2".to_string()
3621 } else {
3622 format!(
3623 "SELECT id, short_id, project_id, project_path, title, content, status, success_criteria, created_in_session, completed_in_session, created_at, updated_at, completed_at
3624 FROM plans WHERE project_path = ?1 AND status = '{}'
3625 ORDER BY updated_at DESC
3626 LIMIT ?2",
3627 status
3628 )
3629 }
3630 } else {
3631 "SELECT id, short_id, project_id, project_path, title, content, status, success_criteria, created_in_session, completed_in_session, created_at, updated_at, completed_at
3633 FROM plans WHERE project_path = ?1 AND status = 'active'
3634 ORDER BY updated_at DESC
3635 LIMIT ?2".to_string()
3636 };
3637
3638 let mut stmt = self.conn.prepare(&sql)?;
3639 let plans = stmt
3640 .query_map(rusqlite::params![project_path, limit], map_plan_row)?
3641 .collect::<std::result::Result<Vec<_>, _>>()?;
3642
3643 Ok(plans)
3644 }
3645
3646 pub fn update_plan(
3652 &mut self,
3653 id: &str,
3654 title: Option<&str>,
3655 content: Option<&str>,
3656 status: Option<&str>,
3657 success_criteria: Option<&str>,
3658 actor: &str,
3659 ) -> Result<()> {
3660 self.mutate("update_plan", actor, |tx, ctx| {
3661 let now = chrono::Utc::now().timestamp_millis();
3662
3663 let mut updates = vec!["updated_at = ?1"];
3665 let mut params: Vec<Box<dyn rusqlite::ToSql>> = vec![Box::new(now)];
3666 let mut param_idx = 2;
3667
3668 if let Some(t) = title {
3669 updates.push(format!("title = ?{param_idx}").leak());
3670 params.push(Box::new(t.to_string()));
3671 param_idx += 1;
3672 }
3673
3674 if let Some(c) = content {
3675 updates.push(format!("content = ?{param_idx}").leak());
3676 params.push(Box::new(c.to_string()));
3677 param_idx += 1;
3678 }
3679
3680 if let Some(s) = status {
3681 updates.push(format!("status = ?{param_idx}").leak());
3682 params.push(Box::new(s.to_string()));
3683 param_idx += 1;
3684
3685 if s == "completed" {
3687 updates.push(format!("completed_at = ?{param_idx}").leak());
3688 params.push(Box::new(now));
3689 param_idx += 1;
3690 }
3691 }
3692
3693 if let Some(sc) = success_criteria {
3694 updates.push(format!("success_criteria = ?{param_idx}").leak());
3695 params.push(Box::new(sc.to_string()));
3696 param_idx += 1;
3697 }
3698
3699 params.push(Box::new(id.to_string()));
3701
3702 let sql = format!(
3703 "UPDATE plans SET {} WHERE id = ?{}",
3704 updates.join(", "),
3705 param_idx
3706 );
3707
3708 let param_refs: Vec<&dyn rusqlite::ToSql> = params.iter().map(|p| p.as_ref()).collect();
3709 let affected = tx.execute(&sql, param_refs.as_slice())?;
3710
3711 if affected == 0 {
3712 return Err(Error::Other(format!("Plan not found: {id}")));
3713 }
3714
3715 let event_type = if status == Some("completed") {
3716 EventType::PlanCompleted
3717 } else {
3718 EventType::PlanUpdated
3719 };
3720 ctx.record_event("plan", id, event_type);
3721 Ok(())
3722 })
3723 }
3724
3725 pub fn store_embedding_chunk(
3738 &mut self,
3739 id: &str,
3740 item_id: &str,
3741 chunk_index: i32,
3742 chunk_text: &str,
3743 embedding: &[f32],
3744 provider: &str,
3745 model: &str,
3746 ) -> Result<()> {
3747 let now = chrono::Utc::now().timestamp_millis();
3748 let dimensions = embedding.len() as i32;
3749
3750 let blob: Vec<u8> = embedding
3752 .iter()
3753 .flat_map(|f| f.to_le_bytes())
3754 .collect();
3755
3756 self.conn.execute(
3757 "INSERT INTO embedding_chunks (id, item_id, chunk_index, chunk_text, embedding, dimensions, provider, model, created_at)
3758 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9)
3759 ON CONFLICT(item_id, chunk_index) DO UPDATE SET
3760 chunk_text = excluded.chunk_text,
3761 embedding = excluded.embedding,
3762 dimensions = excluded.dimensions,
3763 provider = excluded.provider,
3764 model = excluded.model,
3765 created_at = excluded.created_at",
3766 rusqlite::params![id, item_id, chunk_index, chunk_text, blob, dimensions, provider, model, now],
3767 )?;
3768
3769 self.conn.execute(
3771 "UPDATE context_items SET
3772 embedding_status = 'complete',
3773 embedding_provider = ?1,
3774 embedding_model = ?2,
3775 chunk_count = COALESCE(
3776 (SELECT MAX(chunk_index) + 1 FROM embedding_chunks WHERE item_id = ?3),
3777 1
3778 ),
3779 embedded_at = ?4
3780 WHERE id = ?3",
3781 rusqlite::params![provider, model, item_id, now],
3782 )?;
3783
3784 Ok(())
3785 }
3786
3787 pub fn get_embedding_chunks(&self, item_id: &str) -> Result<Vec<EmbeddingChunk>> {
3793 let mut stmt = self.conn.prepare(
3794 "SELECT id, item_id, chunk_index, chunk_text, embedding, dimensions, provider, model, created_at
3795 FROM embedding_chunks
3796 WHERE item_id = ?1
3797 ORDER BY chunk_index ASC",
3798 )?;
3799
3800 let rows = stmt.query_map([item_id], |row| {
3801 let blob: Vec<u8> = row.get(4)?;
3802 let dimensions: i32 = row.get(5)?;
3803
3804 let embedding: Vec<f32> = blob
3806 .chunks_exact(4)
3807 .map(|bytes| f32::from_le_bytes([bytes[0], bytes[1], bytes[2], bytes[3]]))
3808 .collect();
3809
3810 Ok(EmbeddingChunk {
3811 id: row.get(0)?,
3812 item_id: row.get(1)?,
3813 chunk_index: row.get(2)?,
3814 chunk_text: row.get(3)?,
3815 embedding,
3816 dimensions: dimensions as usize,
3817 provider: row.get(6)?,
3818 model: row.get(7)?,
3819 created_at: row.get(8)?,
3820 })
3821 })?;
3822
3823 rows.collect::<std::result::Result<Vec<_>, _>>()
3824 .map_err(Error::from)
3825 }
3826
3827 pub fn get_items_without_embeddings(
3833 &self,
3834 session_id: Option<&str>,
3835 limit: Option<u32>,
3836 ) -> Result<Vec<ContextItem>> {
3837 let limit = limit.unwrap_or(100);
3838
3839 let sql = if let Some(sid) = session_id {
3840 format!(
3841 "SELECT id, session_id, key, value, category, priority, channel, tags, size, created_at, updated_at
3842 FROM context_items
3843 WHERE session_id = '{}' AND (embedding_status IS NULL OR embedding_status = 'none')
3844 ORDER BY created_at DESC
3845 LIMIT {}",
3846 sid, limit
3847 )
3848 } else {
3849 format!(
3850 "SELECT id, session_id, key, value, category, priority, channel, tags, size, created_at, updated_at
3851 FROM context_items
3852 WHERE embedding_status IS NULL OR embedding_status = 'none'
3853 ORDER BY created_at DESC
3854 LIMIT {}",
3855 limit
3856 )
3857 };
3858
3859 let mut stmt = self.conn.prepare(&sql)?;
3860 let rows = stmt.query_map([], |row| {
3861 Ok(ContextItem {
3862 id: row.get(0)?,
3863 session_id: row.get(1)?,
3864 key: row.get(2)?,
3865 value: row.get(3)?,
3866 category: row.get(4)?,
3867 priority: row.get(5)?,
3868 channel: row.get(6)?,
3869 tags: row.get(7)?,
3870 size: row.get(8)?,
3871 created_at: row.get(9)?,
3872 updated_at: row.get(10)?,
3873 })
3874 })?;
3875
3876 rows.collect::<std::result::Result<Vec<_>, _>>()
3877 .map_err(Error::from)
3878 }
3879
3880 pub fn count_embedding_status(&self, session_id: Option<&str>) -> Result<EmbeddingStats> {
3886 let (with_embeddings, without_embeddings) = if let Some(sid) = session_id {
3887 let with: i64 = self.conn.query_row(
3888 "SELECT COUNT(*) FROM context_items WHERE session_id = ?1 AND embedding_status = 'complete'",
3889 [sid],
3890 |row| row.get(0),
3891 )?;
3892 let without: i64 = self.conn.query_row(
3893 "SELECT COUNT(*) FROM context_items WHERE session_id = ?1 AND (embedding_status IS NULL OR embedding_status = 'none')",
3894 [sid],
3895 |row| row.get(0),
3896 )?;
3897 (with, without)
3898 } else {
3899 let with: i64 = self.conn.query_row(
3900 "SELECT COUNT(*) FROM context_items WHERE embedding_status = 'complete'",
3901 [],
3902 |row| row.get(0),
3903 )?;
3904 let without: i64 = self.conn.query_row(
3905 "SELECT COUNT(*) FROM context_items WHERE embedding_status IS NULL OR embedding_status = 'none'",
3906 [],
3907 |row| row.get(0),
3908 )?;
3909 (with, without)
3910 };
3911
3912 Ok(EmbeddingStats {
3913 with_embeddings: with_embeddings as usize,
3914 without_embeddings: without_embeddings as usize,
3915 })
3916 }
3917
3918 pub fn semantic_search(
3928 &self,
3929 query_embedding: &[f32],
3930 session_id: Option<&str>,
3931 limit: usize,
3932 threshold: f32,
3933 ) -> Result<Vec<SemanticSearchResult>> {
3934 let sql = if let Some(sid) = session_id {
3936 format!(
3937 "SELECT ec.id, ec.item_id, ec.chunk_index, ec.chunk_text, ec.embedding, ec.dimensions,
3938 ci.key, ci.value, ci.category, ci.priority
3939 FROM embedding_chunks ec
3940 INNER JOIN context_items ci ON ec.item_id = ci.id
3941 WHERE ci.session_id = '{}'",
3942 sid
3943 )
3944 } else {
3945 "SELECT ec.id, ec.item_id, ec.chunk_index, ec.chunk_text, ec.embedding, ec.dimensions,
3946 ci.key, ci.value, ci.category, ci.priority
3947 FROM embedding_chunks ec
3948 INNER JOIN context_items ci ON ec.item_id = ci.id".to_string()
3949 };
3950
3951 let mut stmt = self.conn.prepare(&sql)?;
3952 let rows = stmt.query_map([], |row| {
3953 let blob: Vec<u8> = row.get(4)?;
3954 let embedding: Vec<f32> = blob
3955 .chunks_exact(4)
3956 .map(|bytes| f32::from_le_bytes([bytes[0], bytes[1], bytes[2], bytes[3]]))
3957 .collect();
3958
3959 Ok((
3960 row.get::<_, String>(1)?, row.get::<_, i32>(2)?, row.get::<_, String>(3)?, embedding,
3964 row.get::<_, String>(6)?, row.get::<_, String>(7)?, row.get::<_, String>(8)?, row.get::<_, String>(9)?, ))
3969 })?;
3970
3971 let mut results: Vec<SemanticSearchResult> = rows
3973 .filter_map(|row| row.ok())
3974 .map(|(item_id, chunk_index, chunk_text, embedding, key, value, category, priority)| {
3975 let similarity = cosine_similarity(query_embedding, &embedding);
3976 SemanticSearchResult {
3977 item_id,
3978 chunk_index,
3979 chunk_text,
3980 similarity,
3981 key,
3982 value,
3983 category,
3984 priority,
3985 }
3986 })
3987 .filter(|r| r.similarity >= threshold)
3988 .collect();
3989
3990 results.sort_by(|a, b| b.similarity.partial_cmp(&a.similarity).unwrap_or(std::cmp::Ordering::Equal));
3992
3993 let mut seen_items = std::collections::HashSet::new();
3995 let deduped: Vec<SemanticSearchResult> = results
3996 .into_iter()
3997 .filter(|r| seen_items.insert(r.item_id.clone()))
3998 .take(limit)
3999 .collect();
4000
4001 Ok(deduped)
4002 }
4003
4004 pub fn delete_embeddings(&mut self, item_id: &str) -> Result<()> {
4010 self.conn.execute(
4011 "DELETE FROM embedding_chunks WHERE item_id = ?1",
4012 [item_id],
4013 )?;
4014
4015 self.conn.execute(
4016 "UPDATE context_items SET
4017 embedding_status = 'none',
4018 embedding_provider = NULL,
4019 embedding_model = NULL,
4020 chunk_count = 0,
4021 embedded_at = NULL
4022 WHERE id = ?1",
4023 [item_id],
4024 )?;
4025
4026 Ok(())
4027 }
4028
4029 pub fn get_embedding_meta(&self, key: &str) -> Result<Option<String>> {
4035 let value = self.conn.query_row(
4036 "SELECT value FROM embeddings_meta WHERE key = ?1",
4037 [key],
4038 |row| row.get(0),
4039 ).optional()?;
4040 Ok(value)
4041 }
4042
4043 pub fn set_embedding_meta(&mut self, key: &str, value: &str) -> Result<()> {
4049 let now = chrono::Utc::now().timestamp_millis();
4050 self.conn.execute(
4051 "INSERT INTO embeddings_meta (key, value, updated_at)
4052 VALUES (?1, ?2, ?3)
4053 ON CONFLICT(key) DO UPDATE SET
4054 value = excluded.value,
4055 updated_at = excluded.updated_at",
4056 rusqlite::params![key, value, now],
4057 )?;
4058 Ok(())
4059 }
4060
4061 pub fn store_fast_embedding_chunk(
4074 &mut self,
4075 id: &str,
4076 item_id: &str,
4077 chunk_index: i32,
4078 chunk_text: &str,
4079 embedding: &[f32],
4080 model: &str,
4081 ) -> Result<()> {
4082 let now = chrono::Utc::now().timestamp_millis();
4083 let dimensions = embedding.len() as i32;
4084
4085 let blob: Vec<u8> = embedding
4087 .iter()
4088 .flat_map(|f| f.to_le_bytes())
4089 .collect();
4090
4091 self.conn.execute(
4092 "INSERT INTO embedding_chunks_fast (id, item_id, chunk_index, chunk_text, embedding, dimensions, provider, model, created_at)
4093 VALUES (?1, ?2, ?3, ?4, ?5, ?6, 'model2vec', ?7, ?8)
4094 ON CONFLICT(item_id, chunk_index) DO UPDATE SET
4095 chunk_text = excluded.chunk_text,
4096 embedding = excluded.embedding,
4097 dimensions = excluded.dimensions,
4098 model = excluded.model,
4099 created_at = excluded.created_at",
4100 rusqlite::params![id, item_id, chunk_index, chunk_text, blob, dimensions, model, now],
4101 )?;
4102
4103 self.conn.execute(
4105 "UPDATE context_items SET
4106 fast_embedding_status = 'complete',
4107 fast_embedded_at = ?1
4108 WHERE id = ?2",
4109 rusqlite::params![now, item_id],
4110 )?;
4111
4112 Ok(())
4113 }
4114
4115 pub fn search_fast_tier(
4124 &self,
4125 query_embedding: &[f32],
4126 session_id: Option<&str>,
4127 limit: usize,
4128 threshold: f32,
4129 ) -> Result<Vec<SemanticSearchResult>> {
4130 let sql = if let Some(sid) = session_id {
4132 format!(
4133 "SELECT ec.id, ec.item_id, ec.chunk_index, ec.chunk_text, ec.embedding, ec.dimensions,
4134 ci.key, ci.value, ci.category, ci.priority
4135 FROM embedding_chunks_fast ec
4136 INNER JOIN context_items ci ON ec.item_id = ci.id
4137 WHERE ci.session_id = '{}'",
4138 sid
4139 )
4140 } else {
4141 "SELECT ec.id, ec.item_id, ec.chunk_index, ec.chunk_text, ec.embedding, ec.dimensions,
4142 ci.key, ci.value, ci.category, ci.priority
4143 FROM embedding_chunks_fast ec
4144 INNER JOIN context_items ci ON ec.item_id = ci.id".to_string()
4145 };
4146
4147 let mut stmt = self.conn.prepare(&sql)?;
4148 let rows = stmt.query_map([], |row| {
4149 let blob: Vec<u8> = row.get(4)?;
4150 let embedding: Vec<f32> = blob
4151 .chunks_exact(4)
4152 .map(|bytes| f32::from_le_bytes([bytes[0], bytes[1], bytes[2], bytes[3]]))
4153 .collect();
4154
4155 Ok((
4156 row.get::<_, String>(1)?, row.get::<_, i32>(2)?, row.get::<_, String>(3)?, embedding,
4160 row.get::<_, String>(6)?, row.get::<_, String>(7)?, row.get::<_, String>(8)?, row.get::<_, String>(9)?, ))
4165 })?;
4166
4167 let mut results: Vec<SemanticSearchResult> = rows
4169 .filter_map(|row| row.ok())
4170 .map(|(item_id, chunk_index, chunk_text, embedding, key, value, category, priority)| {
4171 let similarity = cosine_similarity(query_embedding, &embedding);
4172 SemanticSearchResult {
4173 item_id,
4174 chunk_index,
4175 chunk_text,
4176 similarity,
4177 key,
4178 value,
4179 category,
4180 priority,
4181 }
4182 })
4183 .filter(|r| r.similarity >= threshold)
4184 .collect();
4185
4186 results.sort_by(|a, b| b.similarity.partial_cmp(&a.similarity).unwrap_or(std::cmp::Ordering::Equal));
4188
4189 let mut seen_items = std::collections::HashSet::new();
4191 let deduped: Vec<SemanticSearchResult> = results
4192 .into_iter()
4193 .filter(|r| seen_items.insert(r.item_id.clone()))
4194 .take(limit)
4195 .collect();
4196
4197 Ok(deduped)
4198 }
4199
4200 pub fn get_items_needing_quality_upgrade(
4208 &self,
4209 session_id: Option<&str>,
4210 limit: Option<u32>,
4211 ) -> Result<Vec<ContextItem>> {
4212 let limit = limit.unwrap_or(100);
4213
4214 let sql = if let Some(sid) = session_id {
4215 format!(
4216 "SELECT id, session_id, key, value, category, priority, channel, tags, size, created_at, updated_at
4217 FROM context_items
4218 WHERE session_id = '{}'
4219 AND fast_embedding_status = 'complete'
4220 AND (embedding_status IS NULL OR embedding_status = 'none' OR embedding_status = 'pending')
4221 ORDER BY created_at DESC
4222 LIMIT {}",
4223 sid, limit
4224 )
4225 } else {
4226 format!(
4227 "SELECT id, session_id, key, value, category, priority, channel, tags, size, created_at, updated_at
4228 FROM context_items
4229 WHERE fast_embedding_status = 'complete'
4230 AND (embedding_status IS NULL OR embedding_status = 'none' OR embedding_status = 'pending')
4231 ORDER BY created_at DESC
4232 LIMIT {}",
4233 limit
4234 )
4235 };
4236
4237 let mut stmt = self.conn.prepare(&sql)?;
4238 let rows = stmt.query_map([], |row| {
4239 Ok(ContextItem {
4240 id: row.get(0)?,
4241 session_id: row.get(1)?,
4242 key: row.get(2)?,
4243 value: row.get(3)?,
4244 category: row.get(4)?,
4245 priority: row.get(5)?,
4246 channel: row.get(6)?,
4247 tags: row.get(7)?,
4248 size: row.get(8)?,
4249 created_at: row.get(9)?,
4250 updated_at: row.get(10)?,
4251 })
4252 })?;
4253
4254 rows.collect::<std::result::Result<Vec<_>, _>>()
4255 .map_err(Error::from)
4256 }
4257
4258 pub fn delete_fast_embeddings(&mut self, item_id: &str) -> Result<()> {
4264 self.conn.execute(
4265 "DELETE FROM embedding_chunks_fast WHERE item_id = ?1",
4266 [item_id],
4267 )?;
4268
4269 self.conn.execute(
4270 "UPDATE context_items SET
4271 fast_embedding_status = 'none',
4272 fast_embedded_at = NULL
4273 WHERE id = ?1",
4274 [item_id],
4275 )?;
4276
4277 Ok(())
4278 }
4279
4280 pub fn count_fast_embedding_status(&self, session_id: Option<&str>) -> Result<EmbeddingStats> {
4286 let (with_embeddings, without_embeddings) = if let Some(sid) = session_id {
4287 let with: i64 = self.conn.query_row(
4288 "SELECT COUNT(*) FROM context_items WHERE session_id = ?1 AND fast_embedding_status = 'complete'",
4289 [sid],
4290 |row| row.get(0),
4291 )?;
4292 let without: i64 = self.conn.query_row(
4293 "SELECT COUNT(*) FROM context_items WHERE session_id = ?1 AND (fast_embedding_status IS NULL OR fast_embedding_status = 'none')",
4294 [sid],
4295 |row| row.get(0),
4296 )?;
4297 (with, without)
4298 } else {
4299 let with: i64 = self.conn.query_row(
4300 "SELECT COUNT(*) FROM context_items WHERE fast_embedding_status = 'complete'",
4301 [],
4302 |row| row.get(0),
4303 )?;
4304 let without: i64 = self.conn.query_row(
4305 "SELECT COUNT(*) FROM context_items WHERE fast_embedding_status IS NULL OR fast_embedding_status = 'none'",
4306 [],
4307 |row| row.get(0),
4308 )?;
4309 (with, without)
4310 };
4311
4312 Ok(EmbeddingStats {
4313 with_embeddings: with_embeddings as usize,
4314 without_embeddings: without_embeddings as usize,
4315 })
4316 }
4317}
4318
4319fn map_plan_row(row: &rusqlite::Row) -> rusqlite::Result<Plan> {
4321 let status_str: String = row.get(6)?;
4322 Ok(Plan {
4323 id: row.get(0)?,
4324 short_id: row.get(1)?,
4325 project_id: row.get(2)?,
4326 project_path: row.get(3)?,
4327 title: row.get(4)?,
4328 content: row.get(5)?,
4329 status: PlanStatus::from_str(&status_str),
4330 success_criteria: row.get(7)?,
4331 created_in_session: row.get(8)?,
4332 completed_in_session: row.get(9)?,
4333 created_at: row.get(10)?,
4334 updated_at: row.get(11)?,
4335 completed_at: row.get(12)?,
4336 })
4337}
4338
4339fn map_project_row(row: &rusqlite::Row) -> rusqlite::Result<Project> {
4341 Ok(Project {
4342 id: row.get(0)?,
4343 project_path: row.get(1)?,
4344 name: row.get(2)?,
4345 description: row.get(3)?,
4346 issue_prefix: row.get(4)?,
4347 next_issue_number: row.get(5)?,
4348 plan_prefix: row.get(6)?,
4349 next_plan_number: row.get(7)?,
4350 created_at: row.get(8)?,
4351 updated_at: row.get(9)?,
4352 })
4353}
4354
4355fn map_issue_row(row: &rusqlite::Row) -> rusqlite::Result<Issue> {
4357 Ok(Issue {
4358 id: row.get(0)?,
4359 short_id: row.get(1)?,
4360 project_path: row.get(2)?,
4361 title: row.get(3)?,
4362 description: row.get(4)?,
4363 details: row.get(5)?,
4364 status: row.get(6)?,
4365 priority: row.get(7)?,
4366 issue_type: row.get(8)?,
4367 plan_id: row.get(9)?,
4368 created_by_agent: row.get(10)?,
4369 assigned_to_agent: row.get(11)?,
4370 created_at: row.get(12)?,
4371 updated_at: row.get(13)?,
4372 closed_at: row.get(14)?,
4373 })
4374}
4375
4376#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
4382pub struct Session {
4383 pub id: String,
4384 pub name: String,
4385 pub description: Option<String>,
4386 pub branch: Option<String>,
4387 pub channel: Option<String>,
4388 pub project_path: Option<String>,
4389 pub status: String,
4390 pub ended_at: Option<i64>,
4391 pub created_at: i64,
4392 pub updated_at: i64,
4393}
4394
4395#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
4397pub struct ContextItem {
4398 pub id: String,
4399 pub session_id: String,
4400 pub key: String,
4401 pub value: String,
4402 pub category: String,
4403 pub priority: String,
4404 pub channel: Option<String>,
4405 pub tags: Option<String>,
4406 pub size: i64,
4407 pub created_at: i64,
4408 pub updated_at: i64,
4409}
4410
4411#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
4414pub struct Issue {
4415 pub id: String,
4416 pub short_id: Option<String>,
4417 pub project_path: String,
4418 pub title: String,
4419 pub description: Option<String>,
4420 pub details: Option<String>,
4421 pub status: String,
4422 pub priority: i32,
4423 pub issue_type: String,
4424 pub plan_id: Option<String>,
4425 pub created_by_agent: Option<String>,
4426 pub assigned_to_agent: Option<String>,
4427 pub created_at: i64,
4428 pub updated_at: i64,
4429 pub closed_at: Option<i64>,
4430}
4431
4432#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
4434pub struct Checkpoint {
4435 pub id: String,
4436 pub session_id: String,
4437 pub name: String,
4438 pub description: Option<String>,
4439 pub git_status: Option<String>,
4440 pub git_branch: Option<String>,
4441 pub created_at: i64,
4442 pub item_count: i64,
4443}
4444
4445#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
4447pub struct Memory {
4448 pub id: String,
4449 pub project_path: String,
4450 pub key: String,
4451 pub value: String,
4452 pub category: String,
4453 pub created_at: i64,
4454 pub updated_at: i64,
4455}
4456
4457#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
4459pub struct SyncDeletion {
4460 pub id: i64,
4462 pub entity_type: String,
4464 pub entity_id: String,
4466 pub project_path: String,
4468 pub deleted_at: i64,
4470 pub deleted_by: String,
4472}
4473
4474#[derive(Debug, Clone, serde::Serialize)]
4479pub struct EmbeddingChunk {
4480 pub id: String,
4482 pub item_id: String,
4484 pub chunk_index: i32,
4486 pub chunk_text: String,
4488 pub embedding: Vec<f32>,
4490 pub dimensions: usize,
4492 pub provider: String,
4494 pub model: String,
4496 pub created_at: i64,
4498}
4499
4500#[derive(Debug, Clone, serde::Serialize)]
4502pub struct EmbeddingStats {
4503 pub with_embeddings: usize,
4505 pub without_embeddings: usize,
4507}
4508
4509#[derive(Debug, Clone, serde::Serialize)]
4511pub struct SemanticSearchResult {
4512 pub item_id: String,
4514 pub chunk_index: i32,
4516 pub chunk_text: String,
4518 pub similarity: f32,
4520 pub key: String,
4522 pub value: String,
4524 pub category: String,
4526 pub priority: String,
4528}
4529
4530fn cosine_similarity(a: &[f32], b: &[f32]) -> f32 {
4540 if a.len() != b.len() || a.is_empty() {
4541 return 0.0;
4542 }
4543
4544 let mut dot_product = 0.0;
4545 let mut norm_a = 0.0;
4546 let mut norm_b = 0.0;
4547
4548 for (x, y) in a.iter().zip(b.iter()) {
4549 dot_product += x * y;
4550 norm_a += x * x;
4551 norm_b += y * y;
4552 }
4553
4554 let magnitude = (norm_a * norm_b).sqrt();
4555 if magnitude == 0.0 {
4556 0.0
4557 } else {
4558 dot_product / magnitude
4559 }
4560}
4561
4562fn generate_short_id() -> String {
4564 use std::time::{SystemTime, UNIX_EPOCH};
4565 let now = SystemTime::now()
4566 .duration_since(UNIX_EPOCH)
4567 .unwrap()
4568 .as_millis();
4569 format!("{:04x}", (now & 0xFFFF) as u16)
4570}
4571
4572#[cfg(test)]
4573mod tests {
4574 use super::*;
4575
4576 #[test]
4577 fn test_open_memory() {
4578 let storage = SqliteStorage::open_memory();
4579 assert!(storage.is_ok());
4580 }
4581
4582 #[test]
4583 fn test_session_crud() {
4584 let mut storage = SqliteStorage::open_memory().unwrap();
4585
4586 storage
4588 .create_session(
4589 "sess_1",
4590 "Test Session",
4591 Some("A test session"),
4592 Some("/test/project"),
4593 Some("main"),
4594 "test-actor",
4595 )
4596 .unwrap();
4597
4598 let session = storage.get_session("sess_1").unwrap();
4600 assert!(session.is_some());
4601 let session = session.unwrap();
4602 assert_eq!(session.name, "Test Session");
4603 assert_eq!(session.status, "active");
4604
4605 let sessions = storage
4607 .list_sessions(Some("/test/project"), None, None)
4608 .unwrap();
4609 assert_eq!(sessions.len(), 1);
4610
4611 storage
4613 .update_session_status("sess_1", "completed", "test-actor")
4614 .unwrap();
4615 let session = storage.get_session("sess_1").unwrap().unwrap();
4616 assert_eq!(session.status, "completed");
4617 assert!(session.ended_at.is_some());
4618 }
4619
4620 #[test]
4621 fn test_context_item_crud() {
4622 let mut storage = SqliteStorage::open_memory().unwrap();
4623
4624 storage
4626 .create_session("sess_1", "Test", None, None, None, "actor")
4627 .unwrap();
4628
4629 storage
4631 .save_context_item(
4632 "item_1",
4633 "sess_1",
4634 "test-key",
4635 "test value",
4636 Some("note"),
4637 Some("high"),
4638 "actor",
4639 )
4640 .unwrap();
4641
4642 let items = storage.get_context_items("sess_1", None, None, None).unwrap();
4644 assert_eq!(items.len(), 1);
4645 assert_eq!(items[0].key, "test-key");
4646 assert_eq!(items[0].priority, "high");
4647
4648 storage
4650 .save_context_item(
4651 "item_1",
4652 "sess_1",
4653 "test-key",
4654 "updated value",
4655 Some("decision"),
4656 None,
4657 "actor",
4658 )
4659 .unwrap();
4660
4661 let items = storage.get_context_items("sess_1", None, None, None).unwrap();
4662 assert_eq!(items.len(), 1);
4663 assert_eq!(items[0].value, "updated value");
4664
4665 storage
4667 .delete_context_item("sess_1", "test-key", "actor")
4668 .unwrap();
4669 let items = storage.get_context_items("sess_1", None, None, None).unwrap();
4670 assert_eq!(items.len(), 0);
4671 }
4672
4673 #[test]
4674 fn test_issue_crud() {
4675 let mut storage = SqliteStorage::open_memory().unwrap();
4676
4677 storage
4679 .create_issue(
4680 "issue_1",
4681 Some("TST-1"),
4682 "/test/project",
4683 "Test Issue",
4684 Some("Description"),
4685 None, Some("task"), Some(3), None, "actor",
4690 )
4691 .unwrap();
4692
4693 let issue = storage.get_issue("issue_1", None).unwrap();
4695 assert!(issue.is_some());
4696 let issue = issue.unwrap();
4697 assert_eq!(issue.title, "Test Issue");
4698 assert_eq!(issue.priority, 3);
4699
4700 let issue = storage
4702 .get_issue("TST-1", Some("/test/project"))
4703 .unwrap();
4704 assert!(issue.is_some());
4705
4706 let issues = storage
4708 .list_issues("/test/project", None, None, None)
4709 .unwrap();
4710 assert_eq!(issues.len(), 1);
4711
4712 storage.claim_issue("issue_1", "agent-1").unwrap();
4714 let issue = storage.get_issue("issue_1", None).unwrap().unwrap();
4715 assert_eq!(issue.assigned_to_agent, Some("agent-1".to_string()));
4716 assert_eq!(issue.status, "in_progress");
4717
4718 storage.release_issue("issue_1", "agent-1").unwrap();
4720 let issue = storage.get_issue("issue_1", None).unwrap().unwrap();
4721 assert!(issue.assigned_to_agent.is_none());
4722 assert_eq!(issue.status, "open");
4723
4724 storage
4726 .update_issue_status("issue_1", "closed", "actor")
4727 .unwrap();
4728 let issue = storage.get_issue("issue_1", None).unwrap().unwrap();
4729 assert_eq!(issue.status, "closed");
4730 assert!(issue.closed_at.is_some());
4731 }
4732}