1use crate::error::{Error, Result};
7use crate::model::{Plan, PlanStatus, Project};
8use crate::storage::events::{insert_event, Event, EventType};
9use crate::storage::schema::apply_schema;
10use rusqlite::{Connection, OptionalExtension, Transaction};
11use std::collections::HashSet;
12use std::path::Path;
13use std::time::Duration;
14
15#[derive(Debug)]
17pub struct SqliteStorage {
18 conn: Connection,
19}
20
21pub struct MutationContext {
28 pub op_name: String,
30 pub actor: String,
32 pub events: Vec<Event>,
34 pub dirty_sessions: HashSet<String>,
36 pub dirty_issues: HashSet<String>,
37 pub dirty_items: HashSet<String>,
38 pub dirty_plans: HashSet<String>,
39}
40
41impl MutationContext {
42 #[must_use]
44 pub fn new(op_name: &str, actor: &str) -> Self {
45 Self {
46 op_name: op_name.to_string(),
47 actor: actor.to_string(),
48 events: Vec::new(),
49 dirty_sessions: HashSet::new(),
50 dirty_issues: HashSet::new(),
51 dirty_items: HashSet::new(),
52 dirty_plans: HashSet::new(),
53 }
54 }
55
56 pub fn record_event(
58 &mut self,
59 entity_type: &str,
60 entity_id: &str,
61 event_type: EventType,
62 ) {
63 self.events.push(Event::new(
64 entity_type,
65 entity_id,
66 event_type,
67 &self.actor,
68 ));
69 }
70
71 pub fn record_change(
73 &mut self,
74 entity_type: &str,
75 entity_id: &str,
76 event_type: EventType,
77 old_value: Option<String>,
78 new_value: Option<String>,
79 ) {
80 self.events.push(
81 Event::new(entity_type, entity_id, event_type, &self.actor)
82 .with_values(old_value, new_value),
83 );
84 }
85
86 pub fn mark_session_dirty(&mut self, session_id: &str) {
88 self.dirty_sessions.insert(session_id.to_string());
89 }
90
91 pub fn mark_issue_dirty(&mut self, issue_id: &str) {
93 self.dirty_issues.insert(issue_id.to_string());
94 }
95
96 pub fn mark_item_dirty(&mut self, item_id: &str) {
98 self.dirty_items.insert(item_id.to_string());
99 }
100
101 pub fn mark_plan_dirty(&mut self, plan_id: &str) {
103 self.dirty_plans.insert(plan_id.to_string());
104 }
105}
106
107#[derive(Debug, Clone, Default, serde::Serialize, serde::Deserialize)]
112pub struct BackfillStats {
113 pub sessions: usize,
115 pub issues: usize,
117 pub context_items: usize,
119 pub plans: usize,
121}
122
123impl BackfillStats {
124 #[must_use]
126 pub fn any(&self) -> bool {
127 self.sessions > 0 || self.issues > 0 || self.context_items > 0 || self.plans > 0
128 }
129
130 #[must_use]
132 pub fn total(&self) -> usize {
133 self.sessions + self.issues + self.context_items + self.plans
134 }
135}
136
137#[derive(Debug, Clone, Default, serde::Serialize, serde::Deserialize)]
142pub struct ProjectCounts {
143 pub sessions: usize,
145 pub issues: usize,
147 pub context_items: usize,
149 pub memories: usize,
151 pub checkpoints: usize,
153}
154
155impl ProjectCounts {
156 #[must_use]
158 pub fn total(&self) -> usize {
159 self.sessions + self.issues + self.context_items + self.memories + self.checkpoints
160 }
161}
162
163impl SqliteStorage {
164 pub fn open(path: &Path) -> Result<Self> {
172 Self::open_with_timeout(path, None)
173 }
174
175 pub fn open_with_timeout(path: &Path, timeout_ms: Option<u64>) -> Result<Self> {
181 let conn = Connection::open(path)?;
182
183 if let Some(timeout) = timeout_ms {
184 conn.busy_timeout(Duration::from_millis(timeout))?;
185 } else {
186 conn.busy_timeout(Duration::from_secs(5))?;
188 }
189
190 apply_schema(&conn)?;
191 Ok(Self { conn })
192 }
193
194 pub fn open_memory() -> Result<Self> {
200 let conn = Connection::open_in_memory()?;
201 apply_schema(&conn)?;
202 Ok(Self { conn })
203 }
204
205 #[must_use]
207 pub fn conn(&self) -> &Connection {
208 &self.conn
209 }
210
211 pub fn mutate<F, R>(&mut self, op: &str, actor: &str, f: F) -> Result<R>
224 where
225 F: FnOnce(&Transaction, &mut MutationContext) -> Result<R>,
226 {
227 let tx = self
228 .conn
229 .transaction_with_behavior(rusqlite::TransactionBehavior::Immediate)?;
230
231 let mut ctx = MutationContext::new(op, actor);
232
233 let result = f(&tx, &mut ctx)?;
235
236 for event in &ctx.events {
238 insert_event(&tx, event)?;
239 }
240
241 tx.commit()?;
246
247 Ok(result)
248 }
249
250 pub fn create_session(
260 &mut self,
261 id: &str,
262 name: &str,
263 description: Option<&str>,
264 project_path: Option<&str>,
265 branch: Option<&str>,
266 actor: &str,
267 ) -> Result<()> {
268 let now = chrono::Utc::now().timestamp_millis();
269
270 self.mutate("create_session", actor, |tx, ctx| {
271 tx.execute(
272 "INSERT INTO sessions (id, name, description, project_path, branch, status, created_at, updated_at)
273 VALUES (?1, ?2, ?3, ?4, ?5, 'active', ?6, ?6)",
274 rusqlite::params![id, name, description, project_path, branch, now],
275 )?;
276
277 if let Some(path) = project_path {
279 tx.execute(
280 "INSERT INTO session_projects (session_id, project_path, added_at) VALUES (?1, ?2, ?3)",
281 rusqlite::params![id, path, now],
282 )?;
283 }
284
285 ctx.record_event("session", id, EventType::SessionCreated);
286 ctx.mark_session_dirty(id);
287
288 Ok(())
289 })
290 }
291
292 pub fn get_session(&self, id: &str) -> Result<Option<Session>> {
298 let mut stmt = self.conn.prepare(
299 "SELECT id, name, description, branch, channel, project_path, status, ended_at, created_at, updated_at
300 FROM sessions WHERE id = ?1",
301 )?;
302
303 let session = stmt
304 .query_row([id], |row| {
305 Ok(Session {
306 id: row.get(0)?,
307 name: row.get(1)?,
308 description: row.get(2)?,
309 branch: row.get(3)?,
310 channel: row.get(4)?,
311 project_path: row.get(5)?,
312 status: row.get(6)?,
313 ended_at: row.get(7)?,
314 created_at: row.get(8)?,
315 updated_at: row.get(9)?,
316 })
317 })
318 .optional()?;
319
320 Ok(session)
321 }
322
323 pub fn list_sessions(
329 &self,
330 project_path: Option<&str>,
331 status: Option<&str>,
332 limit: Option<u32>,
333 ) -> Result<Vec<Session>> {
334 self.list_sessions_with_search(project_path, status, limit, None)
335 }
336
337 pub fn list_sessions_with_search(
346 &self,
347 project_path: Option<&str>,
348 status: Option<&str>,
349 limit: Option<u32>,
350 search: Option<&str>,
351 ) -> Result<Vec<Session>> {
352 let limit = limit.unwrap_or(50);
353
354 let mut conditions: Vec<String> = Vec::new();
356 let mut params: Vec<String> = Vec::new();
357 let mut param_idx = 1;
358
359 let (from_clause, select_distinct) = if let Some(path) = project_path {
361 conditions.push(format!("sp.project_path = ?{param_idx}"));
363 params.push(path.to_string());
364 param_idx += 1;
365 (
366 "sessions s JOIN session_projects sp ON s.id = sp.session_id".to_string(),
367 "DISTINCT ",
368 )
369 } else {
370 ("sessions s".to_string(), "")
372 };
373
374 if let Some(st) = status {
375 conditions.push(format!("s.status = ?{param_idx}"));
376 params.push(st.to_string());
377 param_idx += 1;
378 }
379
380 if let Some(search_term) = search {
381 conditions.push(format!(
383 "(s.name LIKE ?{param_idx} COLLATE NOCASE OR s.description LIKE ?{param_idx} COLLATE NOCASE)"
384 ));
385 params.push(format!("%{search_term}%"));
386 param_idx += 1;
387 }
388
389 let where_clause = if conditions.is_empty() {
390 " WHERE 1=1".to_string()
391 } else {
392 format!(" WHERE {}", conditions.join(" AND "))
393 };
394
395 let sql = format!(
396 "SELECT {select_distinct}s.id, s.name, s.description, s.branch, s.channel, s.project_path, s.status, s.ended_at, s.created_at, s.updated_at
397 FROM {from_clause}{where_clause}
398 ORDER BY s.updated_at DESC LIMIT ?{param_idx}"
399 );
400 params.push(limit.to_string());
401
402 let mut stmt = self.conn.prepare(&sql)?;
403 let params_refs: Vec<&dyn rusqlite::ToSql> = params
404 .iter()
405 .map(|s| s as &dyn rusqlite::ToSql)
406 .collect();
407
408 let rows = stmt.query_map(params_refs.as_slice(), |row| {
409 Ok(Session {
410 id: row.get(0)?,
411 name: row.get(1)?,
412 description: row.get(2)?,
413 branch: row.get(3)?,
414 channel: row.get(4)?,
415 project_path: row.get(5)?,
416 status: row.get(6)?,
417 ended_at: row.get(7)?,
418 created_at: row.get(8)?,
419 updated_at: row.get(9)?,
420 })
421 })?;
422
423 rows.collect::<std::result::Result<Vec<_>, _>>()
424 .map_err(Error::from)
425 }
426
427 pub fn update_session_status(
433 &mut self,
434 id: &str,
435 status: &str,
436 actor: &str,
437 ) -> Result<()> {
438 let now = chrono::Utc::now().timestamp_millis();
439 let ended_at = if status == "completed" || status == "paused" {
440 Some(now)
441 } else {
442 None
443 };
444
445 self.mutate("update_session_status", actor, |tx, ctx| {
446 let rows = tx.execute(
447 "UPDATE sessions SET status = ?1, ended_at = ?2, updated_at = ?3 WHERE id = ?4",
448 rusqlite::params![status, ended_at, now, id],
449 )?;
450
451 if rows == 0 {
452 return Err(Error::SessionNotFound { id: id.to_string() });
453 }
454
455 let event_type = match status {
456 "paused" => EventType::SessionPaused,
457 "completed" => EventType::SessionCompleted,
458 _ => EventType::SessionUpdated,
459 };
460 ctx.record_event("session", id, event_type);
461 ctx.mark_session_dirty(id);
462
463 Ok(())
464 })
465 }
466
467 pub fn rename_session(
473 &mut self,
474 id: &str,
475 new_name: &str,
476 actor: &str,
477 ) -> Result<()> {
478 let now = chrono::Utc::now().timestamp_millis();
479
480 self.mutate("rename_session", actor, |tx, ctx| {
481 let rows = tx.execute(
482 "UPDATE sessions SET name = ?1, updated_at = ?2 WHERE id = ?3",
483 rusqlite::params![new_name, now, id],
484 )?;
485
486 if rows == 0 {
487 return Err(Error::SessionNotFound { id: id.to_string() });
488 }
489
490 ctx.record_event("session", id, EventType::SessionUpdated);
491 ctx.mark_session_dirty(id);
492
493 Ok(())
494 })
495 }
496
497 pub fn delete_session(&mut self, id: &str, actor: &str) -> Result<()> {
508 self.mutate("delete_session", actor, |tx, ctx| {
509 let exists: bool = tx
511 .query_row(
512 "SELECT 1 FROM sessions WHERE id = ?1",
513 [id],
514 |_| Ok(true),
515 )
516 .unwrap_or(false);
517
518 if !exists {
519 return Err(Error::SessionNotFound { id: id.to_string() });
520 }
521
522 tx.execute(
524 "DELETE FROM context_items WHERE session_id = ?1",
525 [id],
526 )?;
527
528 tx.execute(
530 "DELETE FROM checkpoints WHERE session_id = ?1",
531 [id],
532 )?;
533
534 tx.execute(
536 "DELETE FROM session_projects WHERE session_id = ?1",
537 [id],
538 )?;
539
540 tx.execute("DELETE FROM sessions WHERE id = ?1", [id])?;
542
543 ctx.record_event("session", id, EventType::SessionDeleted);
544
545 Ok(())
546 })
547 }
548
549 pub fn add_session_path(
555 &mut self,
556 session_id: &str,
557 project_path: &str,
558 actor: &str,
559 ) -> Result<()> {
560 let now = chrono::Utc::now().timestamp_millis();
561
562 self.mutate("add_session_path", actor, |tx, ctx| {
563 let exists: bool = tx
565 .query_row(
566 "SELECT 1 FROM sessions WHERE id = ?1",
567 [session_id],
568 |_| Ok(true),
569 )
570 .unwrap_or(false);
571
572 if !exists {
573 return Err(Error::SessionNotFound { id: session_id.to_string() });
574 }
575
576 let result = tx.execute(
578 "INSERT INTO session_projects (session_id, project_path, added_at) VALUES (?1, ?2, ?3)",
579 rusqlite::params![session_id, project_path, now],
580 );
581
582 match result {
583 Ok(_) => {
584 ctx.record_event("session", session_id, EventType::SessionPathAdded);
585 ctx.mark_session_dirty(session_id);
586 Ok(())
587 }
588 Err(rusqlite::Error::SqliteFailure(err, _))
589 if err.code == rusqlite::ErrorCode::ConstraintViolation =>
590 {
591 Err(Error::Other(format!(
592 "Path already added to session: {project_path}"
593 )))
594 }
595 Err(e) => Err(e.into()),
596 }
597 })
598 }
599
600 pub fn remove_session_path(
608 &mut self,
609 session_id: &str,
610 project_path: &str,
611 actor: &str,
612 ) -> Result<()> {
613 self.mutate("remove_session_path", actor, |tx, ctx| {
614 let session_path: Option<String> = tx
616 .query_row(
617 "SELECT project_path FROM sessions WHERE id = ?1",
618 [session_id],
619 |row| row.get(0),
620 )
621 .optional()?;
622
623 let primary_path = session_path.ok_or_else(|| Error::SessionNotFound {
624 id: session_id.to_string(),
625 })?;
626
627 if primary_path == project_path {
629 return Err(Error::Other(
630 "Cannot remove primary project path. Use delete_session instead.".to_string(),
631 ));
632 }
633
634 let rows = tx.execute(
636 "DELETE FROM session_projects WHERE session_id = ?1 AND project_path = ?2",
637 rusqlite::params![session_id, project_path],
638 )?;
639
640 if rows == 0 {
641 return Err(Error::Other(format!(
642 "Path not found in session: {project_path}"
643 )));
644 }
645
646 ctx.record_event("session", session_id, EventType::SessionPathRemoved);
647 ctx.mark_session_dirty(session_id);
648
649 Ok(())
650 })
651 }
652
653 pub fn get_session_paths(&self, session_id: &str) -> Result<Vec<String>> {
657 let conn = self.conn();
658
659 let primary_path: Option<String> = conn
661 .query_row(
662 "SELECT project_path FROM sessions WHERE id = ?1",
663 [session_id],
664 |row| row.get(0),
665 )
666 .optional()?;
667
668 let Some(primary) = primary_path else {
669 return Err(Error::SessionNotFound { id: session_id.to_string() });
670 };
671
672 let mut stmt = conn.prepare(
674 "SELECT project_path FROM session_projects WHERE session_id = ?1 ORDER BY added_at",
675 )?;
676
677 let additional_paths: Vec<String> = stmt
678 .query_map([session_id], |row| row.get(0))?
679 .filter_map(|r| r.ok())
680 .collect();
681
682 let mut paths = vec![primary];
684 paths.extend(additional_paths);
685
686 Ok(paths)
687 }
688
689 pub fn save_context_item(
699 &mut self,
700 id: &str,
701 session_id: &str,
702 key: &str,
703 value: &str,
704 category: Option<&str>,
705 priority: Option<&str>,
706 actor: &str,
707 ) -> Result<()> {
708 let now = chrono::Utc::now().timestamp_millis();
709 let category = category.unwrap_or("note");
710 let priority = priority.unwrap_or("normal");
711 let size = value.len() as i64;
712
713 self.mutate("save_context_item", actor, |tx, ctx| {
714 let exists: bool = tx
716 .prepare("SELECT 1 FROM context_items WHERE session_id = ?1 AND key = ?2")?
717 .exists(rusqlite::params![session_id, key])?;
718
719 tx.execute(
720 "INSERT INTO context_items (id, session_id, key, value, category, priority, size, created_at, updated_at)
721 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?8)
722 ON CONFLICT(session_id, key) DO UPDATE SET
723 value = excluded.value,
724 category = excluded.category,
725 priority = excluded.priority,
726 size = excluded.size,
727 updated_at = excluded.updated_at",
728 rusqlite::params![id, session_id, key, value, category, priority, size, now],
729 )?;
730
731 let event_type = if exists {
732 EventType::ItemUpdated
733 } else {
734 EventType::ItemCreated
735 };
736 ctx.record_event("context_item", id, event_type);
737 ctx.mark_item_dirty(id);
738
739 Ok(())
740 })
741 }
742
743 pub fn get_item_id_by_key(&self, session_id: &str, key: &str) -> Result<Option<String>> {
747 let id = self.conn.query_row(
748 "SELECT id FROM context_items WHERE session_id = ?1 AND key = ?2",
749 rusqlite::params![session_id, key],
750 |row| row.get(0),
751 ).optional()?;
752 Ok(id)
753 }
754
755 pub fn get_items_with_fast_embeddings(
760 &self,
761 session_id: &str,
762 ) -> Result<Vec<(ContextItem, Option<Vec<f32>>)>> {
763 let sql = "SELECT ci.id, ci.session_id, ci.key, ci.value, ci.category, ci.priority,
764 ci.channel, ci.tags, ci.size, ci.created_at, ci.updated_at,
765 ec.embedding
766 FROM context_items ci
767 LEFT JOIN embedding_chunks_fast ec ON ec.item_id = ci.id AND ec.chunk_index = 0
768 WHERE ci.session_id = ?1
769 ORDER BY ci.updated_at DESC";
770
771 let mut stmt = self.conn.prepare(sql)?;
772 let rows = stmt.query_map(rusqlite::params![session_id], |row| {
773 let item = ContextItem {
774 id: row.get(0)?,
775 session_id: row.get(1)?,
776 key: row.get(2)?,
777 value: row.get(3)?,
778 category: row.get(4)?,
779 priority: row.get(5)?,
780 channel: row.get(6)?,
781 tags: row.get(7)?,
782 size: row.get(8)?,
783 created_at: row.get(9)?,
784 updated_at: row.get(10)?,
785 };
786
787 let embedding: Option<Vec<f32>> = row.get::<_, Option<Vec<u8>>>(11)?
788 .map(|blob| {
789 blob.chunks_exact(4)
790 .map(|bytes| f32::from_le_bytes([bytes[0], bytes[1], bytes[2], bytes[3]]))
791 .collect()
792 });
793
794 Ok((item, embedding))
795 })?;
796
797 let mut results = Vec::new();
798 for row in rows {
799 results.push(row?);
800 }
801 Ok(results)
802 }
803
804 pub fn get_context_items(
810 &self,
811 session_id: &str,
812 category: Option<&str>,
813 priority: Option<&str>,
814 limit: Option<u32>,
815 ) -> Result<Vec<ContextItem>> {
816 let limit = limit.unwrap_or(100);
817
818 let mut sql = String::from(
819 "SELECT id, session_id, key, value, category, priority, channel, tags, size, created_at, updated_at
820 FROM context_items WHERE session_id = ?1",
821 );
822
823 let mut params: Vec<Box<dyn rusqlite::ToSql>> = vec![Box::new(session_id.to_string())];
824
825 if let Some(cat) = category {
826 sql.push_str(" AND category = ?");
827 params.push(Box::new(cat.to_string()));
828 }
829
830 if let Some(pri) = priority {
831 sql.push_str(" AND priority = ?");
832 params.push(Box::new(pri.to_string()));
833 }
834
835 sql.push_str(" ORDER BY created_at DESC LIMIT ?");
836 params.push(Box::new(limit));
837
838 let mut stmt = self.conn.prepare(&sql)?;
839 let params_refs: Vec<&dyn rusqlite::ToSql> = params
840 .iter()
841 .map(|b| b.as_ref())
842 .collect();
843
844 let rows = stmt.query_map(params_refs.as_slice(), |row| {
845 Ok(ContextItem {
846 id: row.get(0)?,
847 session_id: row.get(1)?,
848 key: row.get(2)?,
849 value: row.get(3)?,
850 category: row.get(4)?,
851 priority: row.get(5)?,
852 channel: row.get(6)?,
853 tags: row.get(7)?,
854 size: row.get(8)?,
855 created_at: row.get(9)?,
856 updated_at: row.get(10)?,
857 })
858 })?;
859
860 rows.collect::<std::result::Result<Vec<_>, _>>()
861 .map_err(Error::from)
862 }
863
864 pub fn delete_context_item(
870 &mut self,
871 session_id: &str,
872 key: &str,
873 actor: &str,
874 ) -> Result<()> {
875 self.mutate("delete_context_item", actor, |tx, ctx| {
876 let info: Option<(String, Option<String>)> = tx
878 .query_row(
879 "SELECT ci.id, s.project_path
880 FROM context_items ci
881 JOIN sessions s ON ci.session_id = s.id
882 WHERE ci.session_id = ?1 AND ci.key = ?2",
883 rusqlite::params![session_id, key],
884 |row| Ok((row.get(0)?, row.get(1)?)),
885 )
886 .optional()?;
887
888 let rows = tx.execute(
889 "DELETE FROM context_items WHERE session_id = ?1 AND key = ?2",
890 rusqlite::params![session_id, key],
891 )?;
892
893 if rows > 0 {
894 if let Some((item_id, project_path)) = info {
895 ctx.record_event("context_item", &item_id, EventType::ItemDeleted);
896
897 if let Some(ref path) = project_path {
899 let now = chrono::Utc::now().timestamp_millis();
900 tx.execute(
901 "INSERT INTO sync_deletions (entity_type, entity_id, project_path, deleted_at, deleted_by, exported)
902 VALUES ('context_item', ?1, ?2, ?3, ?4, 0)
903 ON CONFLICT(entity_type, entity_id) DO UPDATE SET
904 deleted_at = excluded.deleted_at,
905 deleted_by = excluded.deleted_by,
906 exported = 0",
907 rusqlite::params![item_id, path, now, ctx.actor],
908 )?;
909 }
910 }
911 }
912
913 Ok(())
914 })
915 }
916
917 pub fn update_context_item(
923 &mut self,
924 session_id: &str,
925 key: &str,
926 value: Option<&str>,
927 category: Option<&str>,
928 priority: Option<&str>,
929 channel: Option<&str>,
930 actor: &str,
931 ) -> Result<()> {
932 self.mutate("update_context_item", actor, |tx, ctx| {
933 let now = chrono::Utc::now().timestamp_millis();
934
935 let mut set_parts: Vec<&str> = vec!["updated_at"];
937 let mut params: Vec<Box<dyn rusqlite::ToSql>> = vec![Box::new(now)];
938
939 if let Some(v) = value {
940 set_parts.push("value");
941 set_parts.push("size");
942 params.push(Box::new(v.to_string()));
943 params.push(Box::new(v.len() as i64));
944 }
945 if let Some(c) = category {
946 set_parts.push("category");
947 params.push(Box::new(c.to_string()));
948 }
949 if let Some(p) = priority {
950 set_parts.push("priority");
951 params.push(Box::new(p.to_string()));
952 }
953 if let Some(ch) = channel {
954 set_parts.push("channel");
955 params.push(Box::new(ch.to_string()));
956 }
957
958 let item_id: Option<String> = tx
960 .query_row(
961 "SELECT id FROM context_items WHERE session_id = ?1 AND key = ?2",
962 rusqlite::params![session_id, key],
963 |row| row.get(0),
964 )
965 .optional()?;
966
967 if item_id.is_none() {
968 return Err(Error::Database(rusqlite::Error::QueryReturnedNoRows));
969 }
970
971 let set_clause: String = set_parts
973 .iter()
974 .enumerate()
975 .map(|(i, field)| format!("{} = ?{}", field, i + 1))
976 .collect::<Vec<_>>()
977 .join(", ");
978
979 let param_count = params.len();
980 let query = format!(
981 "UPDATE context_items SET {} WHERE session_id = ?{} AND key = ?{}",
982 set_clause,
983 param_count + 1,
984 param_count + 2
985 );
986
987 params.push(Box::new(session_id.to_string()));
988 params.push(Box::new(key.to_string()));
989
990 let param_refs: Vec<&dyn rusqlite::ToSql> = params.iter().map(|p| p.as_ref()).collect();
991 tx.execute(&query, param_refs.as_slice())?;
992
993 if let Some(id) = item_id {
994 ctx.record_event("context_item", &id, EventType::ItemUpdated);
995 }
996
997 Ok(())
998 })
999 }
1000
1001 pub fn add_tags_to_item(
1007 &mut self,
1008 session_id: &str,
1009 key: &str,
1010 tags_to_add: &[String],
1011 actor: &str,
1012 ) -> Result<()> {
1013 self.mutate("add_tags_to_item", actor, |tx, ctx| {
1014 let now = chrono::Utc::now().timestamp_millis();
1015
1016 let (item_id, current_tags): (String, String) = tx.query_row(
1018 "SELECT id, tags FROM context_items WHERE session_id = ?1 AND key = ?2",
1019 rusqlite::params![session_id, key],
1020 |row| Ok((row.get(0)?, row.get::<_, Option<String>>(1)?.unwrap_or_else(|| "[]".to_string()))),
1021 )?;
1022
1023 let mut tags: Vec<String> = serde_json::from_str(¤t_tags).unwrap_or_default();
1025
1026 for tag in tags_to_add {
1028 if !tags.contains(tag) {
1029 tags.push(tag.clone());
1030 }
1031 }
1032
1033 let new_tags = serde_json::to_string(&tags).unwrap_or_else(|_| "[]".to_string());
1035 tx.execute(
1036 "UPDATE context_items SET tags = ?1, updated_at = ?2 WHERE id = ?3",
1037 rusqlite::params![new_tags, now, item_id],
1038 )?;
1039
1040 ctx.record_event("context_item", &item_id, EventType::ItemUpdated);
1041
1042 Ok(())
1043 })
1044 }
1045
1046 pub fn remove_tags_from_item(
1052 &mut self,
1053 session_id: &str,
1054 key: &str,
1055 tags_to_remove: &[String],
1056 actor: &str,
1057 ) -> Result<()> {
1058 self.mutate("remove_tags_from_item", actor, |tx, ctx| {
1059 let now = chrono::Utc::now().timestamp_millis();
1060
1061 let (item_id, current_tags): (String, String) = tx.query_row(
1063 "SELECT id, tags FROM context_items WHERE session_id = ?1 AND key = ?2",
1064 rusqlite::params![session_id, key],
1065 |row| Ok((row.get(0)?, row.get::<_, Option<String>>(1)?.unwrap_or_else(|| "[]".to_string()))),
1066 )?;
1067
1068 let mut tags: Vec<String> = serde_json::from_str(¤t_tags).unwrap_or_default();
1070
1071 tags.retain(|t| !tags_to_remove.contains(t));
1073
1074 let new_tags = serde_json::to_string(&tags).unwrap_or_else(|_| "[]".to_string());
1076 tx.execute(
1077 "UPDATE context_items SET tags = ?1, updated_at = ?2 WHERE id = ?3",
1078 rusqlite::params![new_tags, now, item_id],
1079 )?;
1080
1081 ctx.record_event("context_item", &item_id, EventType::ItemUpdated);
1082
1083 Ok(())
1084 })
1085 }
1086
1087 #[allow(clippy::too_many_arguments)]
1097 pub fn create_issue(
1098 &mut self,
1099 id: &str,
1100 short_id: Option<&str>,
1101 project_path: &str,
1102 title: &str,
1103 description: Option<&str>,
1104 details: Option<&str>,
1105 issue_type: Option<&str>,
1106 priority: Option<i32>,
1107 plan_id: Option<&str>,
1108 actor: &str,
1109 ) -> Result<()> {
1110 let now = chrono::Utc::now().timestamp_millis();
1111 let issue_type = issue_type.unwrap_or("task");
1112 let priority = priority.unwrap_or(2);
1113
1114 self.mutate("create_issue", actor, |tx, ctx| {
1115 tx.execute(
1116 "INSERT INTO issues (id, short_id, project_path, title, description, details, issue_type, priority, plan_id, status, created_by_agent, created_at, updated_at)
1117 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, 'open', ?10, ?11, ?11)",
1118 rusqlite::params![id, short_id, project_path, title, description, details, issue_type, priority, plan_id, actor, now],
1119 )?;
1120
1121 ctx.record_event("issue", id, EventType::IssueCreated);
1122 ctx.mark_issue_dirty(id);
1123
1124 Ok(())
1125 })
1126 }
1127
1128 pub fn get_issue(&self, id: &str, project_path: Option<&str>) -> Result<Option<Issue>> {
1134 let sql = if project_path.is_some() {
1136 "SELECT id, short_id, project_path, title, description, details, status, priority, issue_type, plan_id, created_by_agent, assigned_to_agent, created_at, updated_at, closed_at
1137 FROM issues WHERE (id = ?1 OR short_id = ?1) AND project_path = ?2"
1138 } else {
1139 "SELECT id, short_id, project_path, title, description, details, status, priority, issue_type, plan_id, created_by_agent, assigned_to_agent, created_at, updated_at, closed_at
1140 FROM issues WHERE id = ?1 OR short_id = ?1"
1141 };
1142
1143 let mut stmt = self.conn.prepare(sql)?;
1144
1145 let issue = if let Some(path) = project_path {
1146 stmt.query_row(rusqlite::params![id, path], map_issue_row)
1147 } else {
1148 stmt.query_row([id], map_issue_row)
1149 }
1150 .optional()?;
1151
1152 Ok(issue)
1153 }
1154
1155 pub fn list_issues(
1161 &self,
1162 project_path: &str,
1163 status: Option<&str>,
1164 issue_type: Option<&str>,
1165 limit: Option<u32>,
1166 ) -> Result<Vec<Issue>> {
1167 let limit = limit.unwrap_or(50);
1168
1169 let mut sql = String::from(
1170 "SELECT id, short_id, project_path, title, description, details, status, priority, issue_type, plan_id, created_by_agent, assigned_to_agent, created_at, updated_at, closed_at
1171 FROM issues WHERE project_path = ?1",
1172 );
1173
1174 let mut params: Vec<Box<dyn rusqlite::ToSql>> = vec![Box::new(project_path.to_string())];
1175
1176 if let Some(st) = status {
1177 if st != "all" {
1178 sql.push_str(" AND status = ?");
1179 params.push(Box::new(st.to_string()));
1180 }
1181 } else {
1182 sql.push_str(" AND status != 'closed'");
1184 }
1185
1186 if let Some(t) = issue_type {
1187 sql.push_str(" AND issue_type = ?");
1188 params.push(Box::new(t.to_string()));
1189 }
1190
1191 sql.push_str(" ORDER BY priority DESC, created_at ASC LIMIT ?");
1192 params.push(Box::new(limit));
1193
1194 let mut stmt = self.conn.prepare(&sql)?;
1195 let params_refs: Vec<&dyn rusqlite::ToSql> = params
1196 .iter()
1197 .map(|b| b.as_ref())
1198 .collect();
1199
1200 let rows = stmt.query_map(params_refs.as_slice(), map_issue_row)?;
1201
1202 rows.collect::<std::result::Result<Vec<_>, _>>()
1203 .map_err(Error::from)
1204 }
1205
1206 pub fn list_all_issues(
1212 &self,
1213 status: Option<&str>,
1214 issue_type: Option<&str>,
1215 limit: Option<u32>,
1216 ) -> Result<Vec<Issue>> {
1217 let limit = limit.unwrap_or(50);
1218
1219 let mut sql = String::from(
1220 "SELECT id, short_id, project_path, title, description, details, status, priority, issue_type, plan_id, created_by_agent, assigned_to_agent, created_at, updated_at, closed_at
1221 FROM issues WHERE 1=1",
1222 );
1223
1224 let mut params: Vec<Box<dyn rusqlite::ToSql>> = vec![];
1225
1226 if let Some(st) = status {
1227 if st != "all" {
1228 sql.push_str(" AND status = ?");
1229 params.push(Box::new(st.to_string()));
1230 }
1231 } else {
1232 sql.push_str(" AND status != 'closed'");
1234 }
1235
1236 if let Some(t) = issue_type {
1237 sql.push_str(" AND issue_type = ?");
1238 params.push(Box::new(t.to_string()));
1239 }
1240
1241 sql.push_str(" ORDER BY priority DESC, created_at ASC LIMIT ?");
1242 params.push(Box::new(limit));
1243
1244 let mut stmt = self.conn.prepare(&sql)?;
1245 let params_refs: Vec<&dyn rusqlite::ToSql> = params.iter().map(|b| b.as_ref()).collect();
1246
1247 let rows = stmt.query_map(params_refs.as_slice(), map_issue_row)?;
1248
1249 rows.collect::<std::result::Result<Vec<_>, _>>()
1250 .map_err(Error::from)
1251 }
1252
1253 pub fn update_issue_status(
1261 &mut self,
1262 id: &str,
1263 status: &str,
1264 actor: &str,
1265 ) -> Result<()> {
1266 let now = chrono::Utc::now().timestamp_millis();
1267 let closed_at = if status == "closed" { Some(now) } else { None };
1268
1269 self.mutate("update_issue_status", actor, |tx, ctx| {
1270 let rows = tx.execute(
1271 "UPDATE issues SET status = ?1, closed_at = ?2, closed_by_agent = ?3, updated_at = ?4 WHERE id = ?5 OR short_id = ?5",
1272 rusqlite::params![status, closed_at, if status == "closed" { Some(actor) } else { None }, now, id],
1273 )?;
1274
1275 if rows == 0 {
1276 return Err(Error::IssueNotFound { id: id.to_string() });
1277 }
1278
1279 let event_type = if status == "closed" {
1280 EventType::IssueClosed
1281 } else {
1282 EventType::IssueUpdated
1283 };
1284 ctx.record_event("issue", id, event_type);
1285 ctx.mark_issue_dirty(id);
1286
1287 Ok(())
1288 })
1289 }
1290
1291 #[allow(clippy::too_many_arguments)]
1299 pub fn update_issue(
1300 &mut self,
1301 id: &str,
1302 title: Option<&str>,
1303 description: Option<&str>,
1304 details: Option<&str>,
1305 priority: Option<i32>,
1306 issue_type: Option<&str>,
1307 plan_id: Option<&str>,
1308 parent_id: Option<&str>,
1309 actor: &str,
1310 ) -> Result<()> {
1311 let now = chrono::Utc::now().timestamp_millis();
1312
1313 let mut set_clauses = vec!["updated_at = ?"];
1315 let mut params: Vec<Box<dyn rusqlite::ToSql>> = vec![Box::new(now)];
1316
1317 if let Some(t) = title {
1318 set_clauses.push("title = ?");
1319 params.push(Box::new(t.to_string()));
1320 }
1321 if let Some(d) = description {
1322 set_clauses.push("description = ?");
1323 params.push(Box::new(d.to_string()));
1324 }
1325 if let Some(dt) = details {
1326 set_clauses.push("details = ?");
1327 params.push(Box::new(dt.to_string()));
1328 }
1329 if let Some(p) = priority {
1330 set_clauses.push("priority = ?");
1331 params.push(Box::new(p));
1332 }
1333 if let Some(it) = issue_type {
1334 set_clauses.push("issue_type = ?");
1335 params.push(Box::new(it.to_string()));
1336 }
1337 if let Some(pid) = plan_id {
1338 set_clauses.push("plan_id = ?");
1339 params.push(Box::new(pid.to_string()));
1340 }
1341
1342 if set_clauses.len() == 1 && parent_id.is_none() {
1344 return Ok(());
1345 }
1346
1347 self.mutate("update_issue", actor, |tx, ctx| {
1348 if set_clauses.len() > 1 {
1350 let sql = format!(
1351 "UPDATE issues SET {} WHERE id = ? OR short_id = ?",
1352 set_clauses.join(", ")
1353 );
1354 params.push(Box::new(id.to_string()));
1355 params.push(Box::new(id.to_string()));
1356
1357 let param_refs: Vec<&dyn rusqlite::ToSql> = params.iter().map(|p| p.as_ref()).collect();
1358 let rows = tx.execute(&sql, param_refs.as_slice())?;
1359
1360 if rows == 0 {
1361 return Err(Error::IssueNotFound { id: id.to_string() });
1362 }
1363 }
1364
1365 if let Some(new_parent) = parent_id {
1367 let full_id: String = tx.query_row(
1369 "SELECT id FROM issues WHERE id = ?1 OR short_id = ?1",
1370 [id],
1371 |row| row.get(0),
1372 )?;
1373
1374 tx.execute(
1376 "DELETE FROM issue_dependencies WHERE issue_id = ?1 AND dependency_type = 'parent-child'",
1377 [&full_id],
1378 )?;
1379
1380 if !new_parent.is_empty() {
1382 let parent_full_id: String = tx.query_row(
1383 "SELECT id FROM issues WHERE id = ?1 OR short_id = ?1",
1384 [new_parent],
1385 |row| row.get(0),
1386 )?;
1387
1388 tx.execute(
1389 "INSERT INTO issue_dependencies (issue_id, depends_on_id, dependency_type, created_at)
1390 VALUES (?1, ?2, 'parent-child', ?3)",
1391 rusqlite::params![full_id, parent_full_id, now],
1392 )?;
1393 }
1394 }
1395
1396 ctx.record_event("issue", id, EventType::IssueUpdated);
1397 ctx.mark_issue_dirty(id);
1398
1399 Ok(())
1400 })
1401 }
1402
1403 pub fn claim_issue(&mut self, id: &str, actor: &str) -> Result<()> {
1411 let now = chrono::Utc::now().timestamp_millis();
1412
1413 self.mutate("claim_issue", actor, |tx, ctx| {
1414 let rows = tx.execute(
1415 "UPDATE issues SET assigned_to_agent = ?1, assigned_at = ?2, status = 'in_progress', updated_at = ?2 WHERE id = ?3 OR short_id = ?3",
1416 rusqlite::params![actor, now, id],
1417 )?;
1418
1419 if rows == 0 {
1420 return Err(Error::IssueNotFound { id: id.to_string() });
1421 }
1422
1423 ctx.record_event("issue", id, EventType::IssueClaimed);
1424 ctx.mark_issue_dirty(id);
1425
1426 Ok(())
1427 })
1428 }
1429
1430 pub fn release_issue(&mut self, id: &str, actor: &str) -> Result<()> {
1438 let now = chrono::Utc::now().timestamp_millis();
1439
1440 self.mutate("release_issue", actor, |tx, ctx| {
1441 let rows = tx.execute(
1442 "UPDATE issues SET assigned_to_agent = NULL, assigned_at = NULL, status = 'open', updated_at = ?1 WHERE id = ?2 OR short_id = ?2",
1443 rusqlite::params![now, id],
1444 )?;
1445
1446 if rows == 0 {
1447 return Err(Error::IssueNotFound { id: id.to_string() });
1448 }
1449
1450 ctx.record_event("issue", id, EventType::IssueReleased);
1451 ctx.mark_issue_dirty(id);
1452
1453 Ok(())
1454 })
1455 }
1456
1457 pub fn delete_issue(&mut self, id: &str, actor: &str) -> Result<()> {
1465 self.mutate("delete_issue", actor, |tx, ctx| {
1466 let info: Option<(String, String)> = tx
1468 .query_row(
1469 "SELECT id, project_path FROM issues WHERE id = ?1 OR short_id = ?1",
1470 [id],
1471 |row| Ok((row.get(0)?, row.get(1)?)),
1472 )
1473 .optional()?;
1474
1475 let (full_id, project_path) =
1476 info.ok_or_else(|| Error::IssueNotFound { id: id.to_string() })?;
1477
1478 tx.execute(
1480 "DELETE FROM issue_dependencies WHERE issue_id = ?1 OR depends_on_id = ?1",
1481 [&full_id],
1482 )?;
1483
1484 let rows = tx.execute("DELETE FROM issues WHERE id = ?1", [&full_id])?;
1486
1487 if rows == 0 {
1488 return Err(Error::IssueNotFound { id: id.to_string() });
1489 }
1490
1491 ctx.record_event("issue", &full_id, EventType::IssueDeleted);
1492
1493 let now = chrono::Utc::now().timestamp_millis();
1495 tx.execute(
1496 "INSERT INTO sync_deletions (entity_type, entity_id, project_path, deleted_at, deleted_by, exported)
1497 VALUES ('issue', ?1, ?2, ?3, ?4, 0)
1498 ON CONFLICT(entity_type, entity_id) DO UPDATE SET
1499 deleted_at = excluded.deleted_at,
1500 deleted_by = excluded.deleted_by,
1501 exported = 0",
1502 rusqlite::params![full_id, project_path, now, ctx.actor],
1503 )?;
1504
1505 Ok(())
1506 })
1507 }
1508
1509 pub fn add_issue_labels(&mut self, id: &str, labels: &[String], actor: &str) -> Result<()> {
1515 self.mutate("add_issue_labels", actor, |tx, ctx| {
1516 let full_id: String = tx
1518 .query_row(
1519 "SELECT id FROM issues WHERE id = ?1 OR short_id = ?1",
1520 [id],
1521 |row| row.get(0),
1522 )
1523 .optional()?
1524 .ok_or_else(|| Error::IssueNotFound { id: id.to_string() })?;
1525
1526 for label in labels {
1527 let label_id = format!("label_{}", &uuid::Uuid::new_v4().to_string()[..12]);
1528 tx.execute(
1529 "INSERT OR IGNORE INTO issue_labels (id, issue_id, label) VALUES (?1, ?2, ?3)",
1530 rusqlite::params![label_id, full_id, label],
1531 )?;
1532 }
1533
1534 ctx.record_event("issue", &full_id, EventType::IssueUpdated);
1535 Ok(())
1536 })
1537 }
1538
1539 pub fn remove_issue_labels(&mut self, id: &str, labels: &[String], actor: &str) -> Result<()> {
1545 self.mutate("remove_issue_labels", actor, |tx, ctx| {
1546 let full_id: String = tx
1548 .query_row(
1549 "SELECT id FROM issues WHERE id = ?1 OR short_id = ?1",
1550 [id],
1551 |row| row.get(0),
1552 )
1553 .optional()?
1554 .ok_or_else(|| Error::IssueNotFound { id: id.to_string() })?;
1555
1556 for label in labels {
1557 tx.execute(
1558 "DELETE FROM issue_labels WHERE issue_id = ?1 AND label = ?2",
1559 rusqlite::params![full_id, label],
1560 )?;
1561 }
1562
1563 ctx.record_event("issue", &full_id, EventType::IssueUpdated);
1564 Ok(())
1565 })
1566 }
1567
1568 pub fn get_issue_labels(&self, id: &str) -> Result<Vec<String>> {
1574 let full_id: String = self
1575 .conn
1576 .query_row(
1577 "SELECT id FROM issues WHERE id = ?1 OR short_id = ?1",
1578 [id],
1579 |row| row.get(0),
1580 )
1581 .optional()?
1582 .ok_or_else(|| Error::IssueNotFound { id: id.to_string() })?;
1583
1584 let mut stmt = self
1585 .conn
1586 .prepare("SELECT label FROM issue_labels WHERE issue_id = ?1 ORDER BY label")?;
1587 let labels = stmt
1588 .query_map([&full_id], |row| row.get(0))?
1589 .collect::<std::result::Result<Vec<String>, _>>()?;
1590 Ok(labels)
1591 }
1592
1593 pub fn issue_has_dependencies(&self, id: &str) -> Result<bool> {
1595 let full_id: String = self
1596 .conn
1597 .query_row(
1598 "SELECT id FROM issues WHERE id = ?1 OR short_id = ?1",
1599 [id],
1600 |row| row.get(0),
1601 )
1602 .optional()?
1603 .ok_or_else(|| Error::IssueNotFound { id: id.to_string() })?;
1604
1605 let count: i64 = self.conn.query_row(
1606 "SELECT COUNT(*) FROM issue_dependencies WHERE issue_id = ?1",
1607 [&full_id],
1608 |row| row.get(0),
1609 )?;
1610 Ok(count > 0)
1611 }
1612
1613 pub fn issue_has_subtasks(&self, id: &str) -> Result<bool> {
1615 let full_id: String = self
1616 .conn
1617 .query_row(
1618 "SELECT id FROM issues WHERE id = ?1 OR short_id = ?1",
1619 [id],
1620 |row| row.get(0),
1621 )
1622 .optional()?
1623 .ok_or_else(|| Error::IssueNotFound { id: id.to_string() })?;
1624
1625 let count: i64 = self.conn.query_row(
1626 "SELECT COUNT(*) FROM issue_dependencies WHERE depends_on_id = ?1 AND dependency_type = 'parent-child'",
1627 [&full_id],
1628 |row| row.get(0),
1629 )?;
1630 Ok(count > 0)
1631 }
1632
1633 pub fn get_child_issue_ids(&self, parent_id: &str) -> Result<std::collections::HashSet<String>> {
1637 let full_parent_id: String = self
1639 .conn
1640 .query_row(
1641 "SELECT id FROM issues WHERE id = ?1 OR short_id = ?1",
1642 [parent_id],
1643 |row| row.get(0),
1644 )
1645 .optional()?
1646 .ok_or_else(|| Error::IssueNotFound { id: parent_id.to_string() })?;
1647
1648 let mut stmt = self.conn.prepare(
1649 "SELECT issue_id FROM issue_dependencies
1650 WHERE depends_on_id = ?1 AND dependency_type = 'parent-child'",
1651 )?;
1652
1653 let rows = stmt.query_map([&full_parent_id], |row| row.get::<_, String>(0))?;
1654
1655 let mut ids = std::collections::HashSet::new();
1656 for row in rows {
1657 ids.insert(row?);
1658 }
1659 Ok(ids)
1660 }
1661
1662 pub fn add_issue_dependency(
1668 &mut self,
1669 issue_id: &str,
1670 depends_on_id: &str,
1671 dependency_type: &str,
1672 actor: &str,
1673 ) -> Result<()> {
1674 self.mutate("add_issue_dependency", actor, |tx, ctx| {
1675 let full_issue_id: String = tx
1677 .query_row(
1678 "SELECT id FROM issues WHERE id = ?1 OR short_id = ?1",
1679 [issue_id],
1680 |row| row.get(0),
1681 )
1682 .optional()?
1683 .ok_or_else(|| Error::IssueNotFound {
1684 id: issue_id.to_string(),
1685 })?;
1686
1687 let full_depends_on_id: String = tx
1688 .query_row(
1689 "SELECT id FROM issues WHERE id = ?1 OR short_id = ?1",
1690 [depends_on_id],
1691 |row| row.get(0),
1692 )
1693 .optional()?
1694 .ok_or_else(|| Error::IssueNotFound {
1695 id: depends_on_id.to_string(),
1696 })?;
1697
1698 let dep_id = format!("dep_{}", &uuid::Uuid::new_v4().to_string()[..12]);
1699 let now = chrono::Utc::now().timestamp_millis();
1700
1701 tx.execute(
1702 "INSERT OR IGNORE INTO issue_dependencies (id, issue_id, depends_on_id, dependency_type, created_at)
1703 VALUES (?1, ?2, ?3, ?4, ?5)",
1704 rusqlite::params![dep_id, full_issue_id, full_depends_on_id, dependency_type, now],
1705 )?;
1706
1707 ctx.record_event("issue", &full_issue_id, EventType::IssueUpdated);
1708 Ok(())
1709 })
1710 }
1711
1712 pub fn remove_issue_dependency(
1718 &mut self,
1719 issue_id: &str,
1720 depends_on_id: &str,
1721 actor: &str,
1722 ) -> Result<()> {
1723 self.mutate("remove_issue_dependency", actor, |tx, ctx| {
1724 let full_issue_id: String = tx
1726 .query_row(
1727 "SELECT id FROM issues WHERE id = ?1 OR short_id = ?1",
1728 [issue_id],
1729 |row| row.get(0),
1730 )
1731 .optional()?
1732 .ok_or_else(|| Error::IssueNotFound {
1733 id: issue_id.to_string(),
1734 })?;
1735
1736 let full_depends_on_id: String = tx
1737 .query_row(
1738 "SELECT id FROM issues WHERE id = ?1 OR short_id = ?1",
1739 [depends_on_id],
1740 |row| row.get(0),
1741 )
1742 .optional()?
1743 .ok_or_else(|| Error::IssueNotFound {
1744 id: depends_on_id.to_string(),
1745 })?;
1746
1747 tx.execute(
1748 "DELETE FROM issue_dependencies WHERE issue_id = ?1 AND depends_on_id = ?2",
1749 rusqlite::params![full_issue_id, full_depends_on_id],
1750 )?;
1751
1752 ctx.record_event("issue", &full_issue_id, EventType::IssueUpdated);
1753 Ok(())
1754 })
1755 }
1756
1757 pub fn clone_issue(
1763 &mut self,
1764 id: &str,
1765 new_title: Option<&str>,
1766 actor: &str,
1767 ) -> Result<Issue> {
1768 let source = self
1770 .get_issue(id, None)?
1771 .ok_or_else(|| Error::IssueNotFound { id: id.to_string() })?;
1772
1773 let new_id = format!("issue_{}", &uuid::Uuid::new_v4().to_string()[..12]);
1774 let new_short_id = generate_short_id();
1775 let default_title = format!("Copy of {}", source.title);
1776 let title = new_title.unwrap_or(&default_title);
1777 let now = chrono::Utc::now().timestamp_millis();
1778
1779 self.mutate("clone_issue", actor, |tx, ctx| {
1780 tx.execute(
1781 "INSERT INTO issues (id, short_id, project_path, title, description, details, status, priority, issue_type, plan_id, created_by_agent, created_at, updated_at)
1782 VALUES (?1, ?2, ?3, ?4, ?5, ?6, 'open', ?7, ?8, ?9, ?10, ?11, ?11)",
1783 rusqlite::params![
1784 new_id,
1785 new_short_id,
1786 source.project_path,
1787 title,
1788 source.description,
1789 source.details,
1790 source.priority,
1791 source.issue_type,
1792 source.plan_id,
1793 ctx.actor,
1794 now
1795 ],
1796 )?;
1797
1798 let labels: Vec<String> = tx
1800 .prepare("SELECT label FROM issue_labels WHERE issue_id = ?1")?
1801 .query_map([&source.id], |row| row.get(0))?
1802 .collect::<std::result::Result<Vec<String>, _>>()?;
1803
1804 for label in &labels {
1805 let label_id = format!("label_{}", &uuid::Uuid::new_v4().to_string()[..12]);
1806 tx.execute(
1807 "INSERT INTO issue_labels (id, issue_id, label) VALUES (?1, ?2, ?3)",
1808 rusqlite::params![label_id, new_id, label],
1809 )?;
1810 }
1811
1812 ctx.record_event("issue", &new_id, EventType::IssueCreated);
1813 Ok(())
1814 })?;
1815
1816 self.get_issue(&new_id, None)?
1818 .ok_or_else(|| Error::Other("Failed to retrieve cloned issue".to_string()))
1819 }
1820
1821 pub fn mark_issue_duplicate(
1827 &mut self,
1828 id: &str,
1829 duplicate_of_id: &str,
1830 actor: &str,
1831 ) -> Result<()> {
1832 self.add_issue_dependency(id, duplicate_of_id, "duplicate-of", actor)?;
1834
1835 self.update_issue_status(id, "closed", actor)?;
1837
1838 Ok(())
1839 }
1840
1841 pub fn get_ready_issues(&self, project_path: &str, limit: u32) -> Result<Vec<Issue>> {
1847 let mut stmt = self.conn.prepare(
1848 "SELECT i.id, i.short_id, i.project_path, i.title, i.description, i.details,
1849 i.status, i.priority, i.issue_type, i.plan_id, i.created_by_agent,
1850 i.assigned_to_agent, i.created_at, i.updated_at, i.closed_at
1851 FROM issues i
1852 WHERE i.project_path = ?1
1853 AND i.status = 'open'
1854 AND i.assigned_to_agent IS NULL
1855 AND NOT EXISTS (
1856 SELECT 1 FROM issue_dependencies d
1857 JOIN issues dep ON dep.id = d.depends_on_id
1858 WHERE d.issue_id = i.id
1859 AND d.dependency_type = 'blocks'
1860 AND dep.status != 'closed'
1861 )
1862 ORDER BY i.priority DESC, i.created_at ASC
1863 LIMIT ?2",
1864 )?;
1865
1866 let issues = stmt
1867 .query_map(rusqlite::params![project_path, limit], |row| {
1868 Ok(Issue {
1869 id: row.get(0)?,
1870 short_id: row.get(1)?,
1871 project_path: row.get(2)?,
1872 title: row.get(3)?,
1873 description: row.get(4)?,
1874 details: row.get(5)?,
1875 status: row.get(6)?,
1876 priority: row.get(7)?,
1877 issue_type: row.get(8)?,
1878 plan_id: row.get(9)?,
1879 created_by_agent: row.get(10)?,
1880 assigned_to_agent: row.get(11)?,
1881 created_at: row.get(12)?,
1882 updated_at: row.get(13)?,
1883 closed_at: row.get(14)?,
1884 })
1885 })?
1886 .collect::<std::result::Result<Vec<_>, _>>()?;
1887
1888 Ok(issues)
1889 }
1890
1891 pub fn get_next_issue_block(
1897 &mut self,
1898 project_path: &str,
1899 count: u32,
1900 actor: &str,
1901 ) -> Result<Vec<Issue>> {
1902 let ready = self.get_ready_issues(project_path, count)?;
1903
1904 for issue in &ready {
1905 self.claim_issue(&issue.id, actor)?;
1906 }
1907
1908 let claimed: Vec<Issue> = ready
1910 .iter()
1911 .filter_map(|i| self.get_issue(&i.id, None).ok().flatten())
1912 .collect();
1913
1914 Ok(claimed)
1915 }
1916
1917 pub fn count_issues_grouped(
1923 &self,
1924 project_path: &str,
1925 group_by: &str,
1926 ) -> Result<Vec<(String, i64)>> {
1927 let column = match group_by {
1928 "status" => "status",
1929 "type" => "issue_type",
1930 "priority" => "CAST(priority AS TEXT)",
1931 "assignee" => "COALESCE(assigned_to_agent, 'unassigned')",
1932 _ => return Err(Error::InvalidArgument(
1933 format!("Invalid group_by '{group_by}'. Valid: status, type, priority, assignee")
1934 )),
1935 };
1936
1937 let sql = format!(
1938 "SELECT {column}, COUNT(*) as count FROM issues \
1939 WHERE project_path = ?1 GROUP BY {column} ORDER BY count DESC"
1940 );
1941
1942 let mut stmt = self.conn.prepare(&sql)?;
1943 let rows = stmt.query_map([project_path], |row| {
1944 Ok((row.get::<_, String>(0)?, row.get::<_, i64>(1)?))
1945 })?;
1946
1947 Ok(rows.collect::<std::result::Result<Vec<_>, _>>()?)
1948 }
1949
1950 pub fn get_stale_issues(
1952 &self,
1953 project_path: &str,
1954 stale_days: u64,
1955 limit: u32,
1956 ) -> Result<Vec<Issue>> {
1957 let cutoff_ms = chrono::Utc::now().timestamp_millis()
1958 - (stale_days as i64 * 24 * 60 * 60 * 1000);
1959
1960 let mut stmt = self.conn.prepare(
1961 "SELECT id, short_id, project_path, title, description, details,
1962 status, priority, issue_type, plan_id, created_by_agent,
1963 assigned_to_agent, created_at, updated_at, closed_at
1964 FROM issues
1965 WHERE project_path = ?1
1966 AND status IN ('open', 'in_progress', 'blocked')
1967 AND updated_at < ?2
1968 ORDER BY updated_at ASC
1969 LIMIT ?3",
1970 )?;
1971
1972 let issues = stmt
1973 .query_map(rusqlite::params![project_path, cutoff_ms, limit], map_issue_row)?
1974 .collect::<std::result::Result<Vec<_>, _>>()?;
1975
1976 Ok(issues)
1977 }
1978
1979 pub fn get_blocked_issues(
1981 &self,
1982 project_path: &str,
1983 limit: u32,
1984 ) -> Result<Vec<(Issue, Vec<Issue>)>> {
1985 let mut stmt = self.conn.prepare(
1986 "SELECT i.id, i.short_id, i.project_path, i.title, i.description, i.details,
1987 i.status, i.priority, i.issue_type, i.plan_id, i.created_by_agent,
1988 i.assigned_to_agent, i.created_at, i.updated_at, i.closed_at
1989 FROM issues i
1990 WHERE i.project_path = ?1
1991 AND i.status NOT IN ('closed', 'deferred')
1992 AND EXISTS (
1993 SELECT 1 FROM issue_dependencies d
1994 JOIN issues dep ON dep.id = d.depends_on_id
1995 WHERE d.issue_id = i.id
1996 AND d.dependency_type = 'blocks'
1997 AND dep.status != 'closed'
1998 )
1999 ORDER BY i.priority DESC, i.created_at ASC
2000 LIMIT ?2",
2001 )?;
2002
2003 let blocked_issues = stmt
2004 .query_map(rusqlite::params![project_path, limit], map_issue_row)?
2005 .collect::<std::result::Result<Vec<_>, _>>()?;
2006
2007 let mut blocker_stmt = self.conn.prepare(
2008 "SELECT dep.id, dep.short_id, dep.project_path, dep.title, dep.description, dep.details,
2009 dep.status, dep.priority, dep.issue_type, dep.plan_id, dep.created_by_agent,
2010 dep.assigned_to_agent, dep.created_at, dep.updated_at, dep.closed_at
2011 FROM issue_dependencies d
2012 JOIN issues dep ON dep.id = d.depends_on_id
2013 WHERE d.issue_id = ?1
2014 AND d.dependency_type = 'blocks'
2015 AND dep.status != 'closed'",
2016 )?;
2017
2018 let mut results = Vec::with_capacity(blocked_issues.len());
2019 for issue in blocked_issues {
2020 let blockers = blocker_stmt
2021 .query_map([&issue.id], map_issue_row)?
2022 .collect::<std::result::Result<Vec<_>, _>>()?;
2023 results.push((issue, blockers));
2024 }
2025
2026 Ok(results)
2027 }
2028
2029 pub fn get_epic_progress(&self, epic_id: &str) -> Result<EpicProgress> {
2031 let mut stmt = self.conn.prepare(
2032 "SELECT child.status, COUNT(*) as count
2033 FROM issue_dependencies d
2034 JOIN issues child ON child.id = d.issue_id
2035 WHERE d.depends_on_id = ?1
2036 AND d.dependency_type = 'parent-child'
2037 GROUP BY child.status",
2038 )?;
2039
2040 let rows = stmt
2041 .query_map([epic_id], |row| {
2042 Ok((row.get::<_, String>(0)?, row.get::<_, usize>(1)?))
2043 })?
2044 .collect::<std::result::Result<Vec<_>, _>>()?;
2045
2046 let mut progress = EpicProgress::default();
2047 for (status, count) in rows {
2048 match status.as_str() {
2049 "closed" => progress.closed += count,
2050 "in_progress" => progress.in_progress += count,
2051 "open" => progress.open += count,
2052 "blocked" => progress.blocked += count,
2053 "deferred" => progress.deferred += count,
2054 _ => progress.open += count,
2055 }
2056 progress.total += count;
2057 }
2058
2059 Ok(progress)
2060 }
2061
2062 pub fn get_dependency_tree(&self, root_id: &str) -> Result<Vec<(Issue, i32)>> {
2065 let root = self.get_issue(root_id, None)?
2067 .ok_or_else(|| Error::IssueNotFound { id: root_id.to_string() })?;
2068
2069 let root_full_id = root.id.clone();
2070 let mut result = vec![(root, 0)];
2071 let mut queue = vec![(root_full_id.clone(), 0i32)];
2072 let mut visited = std::collections::HashSet::new();
2073 visited.insert(root_full_id);
2074
2075 let mut child_stmt = self.conn.prepare(
2076 "SELECT child.id, child.short_id, child.project_path, child.title,
2077 child.description, child.details, child.status, child.priority,
2078 child.issue_type, child.plan_id, child.created_by_agent,
2079 child.assigned_to_agent, child.created_at, child.updated_at,
2080 child.closed_at
2081 FROM issue_dependencies d
2082 JOIN issues child ON child.id = d.issue_id
2083 WHERE d.depends_on_id = ?1
2084 AND d.dependency_type IN ('parent-child', 'blocks')
2085 ORDER BY child.priority DESC, child.created_at ASC",
2086 )?;
2087
2088 while let Some((parent_id, depth)) = queue.pop() {
2089 let children = child_stmt
2090 .query_map([&parent_id], map_issue_row)?
2091 .collect::<std::result::Result<Vec<_>, _>>()?;
2092
2093 for child in children {
2094 if visited.insert(child.id.clone()) {
2095 let child_id = child.id.clone();
2096 result.push((child, depth + 1));
2097 queue.push((child_id, depth + 1));
2098 }
2099 }
2100 }
2101
2102 Ok(result)
2103 }
2104
2105 pub fn get_epics(&self, project_path: &str) -> Result<Vec<Issue>> {
2107 let mut stmt = self.conn.prepare(
2108 "SELECT id, short_id, project_path, title, description, details,
2109 status, priority, issue_type, plan_id, created_by_agent,
2110 assigned_to_agent, created_at, updated_at, closed_at
2111 FROM issues
2112 WHERE project_path = ?1
2113 AND issue_type = 'epic'
2114 AND status != 'closed'
2115 ORDER BY priority DESC, created_at ASC",
2116 )?;
2117
2118 let issues = stmt
2119 .query_map([project_path], map_issue_row)?
2120 .collect::<std::result::Result<Vec<_>, _>>()?;
2121
2122 Ok(issues)
2123 }
2124
2125 pub fn set_close_reason(
2127 &mut self,
2128 id: &str,
2129 reason: &str,
2130 actor: &str,
2131 ) -> Result<()> {
2132 let now = chrono::Utc::now().timestamp_millis();
2133 self.mutate("set_close_reason", actor, |tx, _ctx| {
2134 let rows = tx.execute(
2135 "UPDATE issues SET close_reason = ?1, updated_at = ?2 WHERE id = ?3 OR short_id = ?3",
2136 rusqlite::params![reason, now, id],
2137 )?;
2138 if rows == 0 {
2139 return Err(Error::IssueNotFound { id: id.to_string() });
2140 }
2141 Ok(())
2142 })
2143 }
2144
2145 pub fn get_close_reason(&self, id: &str) -> Result<Option<String>> {
2147 let result = self.conn.query_row(
2148 "SELECT close_reason FROM issues WHERE id = ?1 OR short_id = ?1",
2149 [id],
2150 |row| row.get(0),
2151 );
2152 match result {
2153 Ok(reason) => Ok(reason),
2154 Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
2155 Err(e) => Err(e.into()),
2156 }
2157 }
2158
2159 #[allow(clippy::too_many_arguments)]
2169 pub fn create_checkpoint(
2170 &mut self,
2171 id: &str,
2172 session_id: &str,
2173 name: &str,
2174 description: Option<&str>,
2175 git_status: Option<&str>,
2176 git_branch: Option<&str>,
2177 actor: &str,
2178 ) -> Result<()> {
2179 let now = chrono::Utc::now().timestamp_millis();
2180
2181 self.mutate("create_checkpoint", actor, |tx, ctx| {
2182 tx.execute(
2183 "INSERT INTO checkpoints (id, session_id, name, description, git_status, git_branch, created_at)
2184 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)",
2185 rusqlite::params![id, session_id, name, description, git_status, git_branch, now],
2186 )?;
2187
2188 ctx.record_event("checkpoint", id, EventType::CheckpointCreated);
2189
2190 Ok(())
2191 })
2192 }
2193
2194 pub fn add_checkpoint_item(
2200 &mut self,
2201 checkpoint_id: &str,
2202 context_item_id: &str,
2203 actor: &str,
2204 ) -> Result<()> {
2205 let id = format!("cpitem_{}", &uuid::Uuid::new_v4().to_string()[..12]);
2206 self.mutate("add_checkpoint_item", actor, |tx, _ctx| {
2207 tx.execute(
2208 "INSERT OR IGNORE INTO checkpoint_items (id, checkpoint_id, context_item_id)
2209 VALUES (?1, ?2, ?3)",
2210 rusqlite::params![id, checkpoint_id, context_item_id],
2211 )?;
2212
2213 Ok(())
2214 })
2215 }
2216
2217 pub fn count_items_since_last_checkpoint(&self, session_id: &str) -> Result<i64> {
2225 let last_checkpoint_time: Option<i64> = self.conn.query_row(
2226 "SELECT MAX(created_at) FROM checkpoints WHERE session_id = ?1",
2227 [session_id],
2228 |row| row.get(0),
2229 )?;
2230
2231 let count = if let Some(ts) = last_checkpoint_time {
2232 self.conn.query_row(
2233 "SELECT COUNT(*) FROM context_items WHERE session_id = ?1 AND created_at > ?2",
2234 rusqlite::params![session_id, ts],
2235 |row| row.get(0),
2236 )?
2237 } else {
2238 self.conn.query_row(
2240 "SELECT COUNT(*) FROM context_items WHERE session_id = ?1",
2241 [session_id],
2242 |row| row.get(0),
2243 )?
2244 };
2245
2246 Ok(count)
2247 }
2248
2249 pub fn list_checkpoints(
2255 &self,
2256 session_id: &str,
2257 limit: Option<u32>,
2258 ) -> Result<Vec<Checkpoint>> {
2259 let limit = limit.unwrap_or(20);
2260
2261 let mut stmt = self.conn.prepare(
2262 "SELECT c.id, c.session_id, c.name, c.description, c.git_status, c.git_branch, c.created_at,
2263 (SELECT COUNT(*) FROM checkpoint_items ci WHERE ci.checkpoint_id = c.id) as item_count
2264 FROM checkpoints c
2265 WHERE c.session_id = ?1
2266 ORDER BY c.created_at DESC
2267 LIMIT ?2",
2268 )?;
2269
2270 let rows = stmt.query_map(rusqlite::params![session_id, limit], |row| {
2271 Ok(Checkpoint {
2272 id: row.get(0)?,
2273 session_id: row.get(1)?,
2274 name: row.get(2)?,
2275 description: row.get(3)?,
2276 git_status: row.get(4)?,
2277 git_branch: row.get(5)?,
2278 created_at: row.get(6)?,
2279 item_count: row.get(7)?,
2280 })
2281 })?;
2282
2283 rows.collect::<std::result::Result<Vec<_>, _>>()
2284 .map_err(Error::from)
2285 }
2286
2287 pub fn get_checkpoint(&self, id: &str) -> Result<Option<Checkpoint>> {
2293 let mut stmt = self.conn.prepare(
2294 "SELECT c.id, c.session_id, c.name, c.description, c.git_status, c.git_branch, c.created_at,
2295 (SELECT COUNT(*) FROM checkpoint_items ci WHERE ci.checkpoint_id = c.id) as item_count
2296 FROM checkpoints c
2297 WHERE c.id = ?1",
2298 )?;
2299
2300 let checkpoint = stmt
2301 .query_row([id], |row| {
2302 Ok(Checkpoint {
2303 id: row.get(0)?,
2304 session_id: row.get(1)?,
2305 name: row.get(2)?,
2306 description: row.get(3)?,
2307 git_status: row.get(4)?,
2308 git_branch: row.get(5)?,
2309 created_at: row.get(6)?,
2310 item_count: row.get(7)?,
2311 })
2312 })
2313 .optional()?;
2314
2315 Ok(checkpoint)
2316 }
2317
2318 pub fn delete_checkpoint(&mut self, id: &str, actor: &str) -> Result<()> {
2324 self.mutate("delete_checkpoint", actor, |tx, ctx| {
2325 let project_path: Option<Option<String>> = tx
2327 .query_row(
2328 "SELECT s.project_path FROM checkpoints c
2329 JOIN sessions s ON c.session_id = s.id
2330 WHERE c.id = ?1",
2331 [id],
2332 |row| row.get(0),
2333 )
2334 .optional()?;
2335
2336 tx.execute("DELETE FROM checkpoint_items WHERE checkpoint_id = ?1", [id])?;
2338
2339 let rows = tx.execute("DELETE FROM checkpoints WHERE id = ?1", [id])?;
2341
2342 if rows == 0 {
2343 return Err(Error::CheckpointNotFound { id: id.to_string() });
2344 }
2345
2346 ctx.record_event("checkpoint", id, EventType::CheckpointDeleted);
2347
2348 if let Some(Some(path)) = project_path {
2350 let now = chrono::Utc::now().timestamp_millis();
2351 tx.execute(
2352 "INSERT INTO sync_deletions (entity_type, entity_id, project_path, deleted_at, deleted_by, exported)
2353 VALUES ('checkpoint', ?1, ?2, ?3, ?4, 0)
2354 ON CONFLICT(entity_type, entity_id) DO UPDATE SET
2355 deleted_at = excluded.deleted_at,
2356 deleted_by = excluded.deleted_by,
2357 exported = 0",
2358 rusqlite::params![id, path, now, ctx.actor],
2359 )?;
2360 }
2361
2362 Ok(())
2363 })
2364 }
2365
2366 pub fn get_checkpoint_items(&self, checkpoint_id: &str) -> Result<Vec<ContextItem>> {
2372 let mut stmt = self.conn.prepare(
2373 "SELECT ci.id, ci.session_id, ci.key, ci.value, ci.category, ci.priority,
2374 ci.channel, ci.tags, ci.size, ci.created_at, ci.updated_at
2375 FROM context_items ci
2376 JOIN checkpoint_items cpi ON cpi.context_item_id = ci.id
2377 WHERE cpi.checkpoint_id = ?1
2378 ORDER BY ci.priority DESC, ci.created_at DESC",
2379 )?;
2380
2381 let rows = stmt.query_map([checkpoint_id], |row| {
2382 Ok(ContextItem {
2383 id: row.get(0)?,
2384 session_id: row.get(1)?,
2385 key: row.get(2)?,
2386 value: row.get(3)?,
2387 category: row.get(4)?,
2388 priority: row.get(5)?,
2389 channel: row.get(6)?,
2390 tags: row.get(7)?,
2391 size: row.get(8)?,
2392 created_at: row.get(9)?,
2393 updated_at: row.get(10)?,
2394 })
2395 })?;
2396
2397 rows.collect::<std::result::Result<Vec<_>, _>>()
2398 .map_err(Error::from)
2399 }
2400
2401 pub fn restore_checkpoint(
2410 &mut self,
2411 checkpoint_id: &str,
2412 target_session_id: &str,
2413 restore_categories: Option<&[String]>,
2414 restore_tags: Option<&[String]>,
2415 actor: &str,
2416 ) -> Result<usize> {
2417 let mut items = self.get_checkpoint_items(checkpoint_id)?;
2419
2420 if let Some(categories) = restore_categories {
2422 items.retain(|item| categories.contains(&item.category));
2423 }
2424
2425 if let Some(tags) = restore_tags {
2427 items.retain(|item| {
2428 if let Some(ref item_tags) = item.tags {
2430 if let Ok(parsed_tags) = serde_json::from_str::<Vec<String>>(item_tags) {
2431 return tags.iter().any(|t| parsed_tags.contains(t));
2432 }
2433 }
2434 false
2435 });
2436 }
2437
2438 let now = chrono::Utc::now().timestamp_millis();
2439
2440 self.mutate("restore_checkpoint", actor, |tx, ctx| {
2441 tx.execute(
2443 "DELETE FROM context_items WHERE session_id = ?1",
2444 [target_session_id],
2445 )?;
2446
2447 let mut restored = 0;
2449 for item in &items {
2450 let new_id = uuid::Uuid::new_v4().to_string();
2451 let size = item.value.len() as i64;
2452
2453 tx.execute(
2454 "INSERT INTO context_items (id, session_id, key, value, category, priority, channel, tags, size, created_at, updated_at)
2455 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?10)",
2456 rusqlite::params![
2457 new_id,
2458 target_session_id,
2459 item.key,
2460 item.value,
2461 item.category,
2462 item.priority,
2463 item.channel,
2464 item.tags,
2465 size,
2466 now,
2467 ],
2468 )?;
2469
2470 ctx.record_event("context_item", &new_id, EventType::ItemCreated);
2471 restored += 1;
2472 }
2473
2474 Ok(restored)
2475 })
2476 }
2477
2478 pub fn remove_checkpoint_item(
2484 &mut self,
2485 checkpoint_id: &str,
2486 context_item_id: &str,
2487 actor: &str,
2488 ) -> Result<()> {
2489 self.mutate("remove_checkpoint_item", actor, |tx, _ctx| {
2490 tx.execute(
2491 "DELETE FROM checkpoint_items WHERE checkpoint_id = ?1 AND context_item_id = ?2",
2492 rusqlite::params![checkpoint_id, context_item_id],
2493 )?;
2494 Ok(())
2495 })
2496 }
2497
2498 pub fn add_checkpoint_items_by_keys(
2504 &mut self,
2505 checkpoint_id: &str,
2506 session_id: &str,
2507 keys: &[String],
2508 actor: &str,
2509 ) -> Result<usize> {
2510 let mut added = 0;
2511
2512 for key in keys {
2513 let item_id: Option<String> = self.conn.query_row(
2515 "SELECT id FROM context_items WHERE session_id = ?1 AND key = ?2",
2516 rusqlite::params![session_id, key],
2517 |row| row.get(0),
2518 ).optional()?;
2519
2520 if let Some(id) = item_id {
2521 self.add_checkpoint_item(checkpoint_id, &id, actor)?;
2522 added += 1;
2523 }
2524 }
2525
2526 Ok(added)
2527 }
2528
2529 pub fn remove_checkpoint_items_by_keys(
2535 &mut self,
2536 checkpoint_id: &str,
2537 keys: &[String],
2538 actor: &str,
2539 ) -> Result<usize> {
2540 let mut removed = 0;
2541
2542 for key in keys {
2543 let item_id: Option<String> = self.conn.query_row(
2545 "SELECT ci.id FROM context_items ci
2546 JOIN checkpoint_items cpi ON cpi.context_item_id = ci.id
2547 WHERE cpi.checkpoint_id = ?1 AND ci.key = ?2",
2548 rusqlite::params![checkpoint_id, key],
2549 |row| row.get(0),
2550 ).optional()?;
2551
2552 if let Some(id) = item_id {
2553 self.remove_checkpoint_item(checkpoint_id, &id, actor)?;
2554 removed += 1;
2555 }
2556 }
2557
2558 Ok(removed)
2559 }
2560
2561 #[allow(clippy::too_many_arguments)]
2571 pub fn save_memory(
2572 &mut self,
2573 id: &str,
2574 project_path: &str,
2575 key: &str,
2576 value: &str,
2577 category: &str,
2578 actor: &str,
2579 ) -> Result<()> {
2580 let now = chrono::Utc::now().timestamp_millis();
2581
2582 self.mutate("save_memory", actor, |tx, ctx| {
2583 tx.execute(
2584 "INSERT INTO project_memory (id, project_path, key, value, category, created_at, updated_at)
2585 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?6)
2586 ON CONFLICT(project_path, key) DO UPDATE SET
2587 value = excluded.value,
2588 category = excluded.category,
2589 updated_at = excluded.updated_at",
2590 rusqlite::params![id, project_path, key, value, category, now],
2591 )?;
2592
2593 ctx.record_event("memory", id, EventType::MemorySaved);
2594
2595 Ok(())
2596 })
2597 }
2598
2599 pub fn get_memory(&self, project_path: &str, key: &str) -> Result<Option<Memory>> {
2605 let mut stmt = self.conn.prepare(
2606 "SELECT id, project_path, key, value, category, created_at, updated_at
2607 FROM project_memory WHERE project_path = ?1 AND key = ?2",
2608 )?;
2609
2610 let memory = stmt
2611 .query_row(rusqlite::params![project_path, key], |row| {
2612 Ok(Memory {
2613 id: row.get(0)?,
2614 project_path: row.get(1)?,
2615 key: row.get(2)?,
2616 value: row.get(3)?,
2617 category: row.get(4)?,
2618 created_at: row.get(5)?,
2619 updated_at: row.get(6)?,
2620 })
2621 })
2622 .optional()?;
2623
2624 Ok(memory)
2625 }
2626
2627 pub fn list_memory(
2633 &self,
2634 project_path: &str,
2635 category: Option<&str>,
2636 ) -> Result<Vec<Memory>> {
2637 let map_row = |row: &rusqlite::Row| -> rusqlite::Result<Memory> {
2638 Ok(Memory {
2639 id: row.get(0)?,
2640 project_path: row.get(1)?,
2641 key: row.get(2)?,
2642 value: row.get(3)?,
2643 category: row.get(4)?,
2644 created_at: row.get(5)?,
2645 updated_at: row.get(6)?,
2646 })
2647 };
2648
2649 let rows = if let Some(cat) = category {
2650 let mut stmt = self.conn.prepare(
2651 "SELECT id, project_path, key, value, category, created_at, updated_at
2652 FROM project_memory WHERE project_path = ?1 AND category = ?2
2653 ORDER BY key ASC",
2654 )?;
2655 stmt.query_map(rusqlite::params![project_path, cat], map_row)?
2656 .collect::<std::result::Result<Vec<_>, _>>()
2657 } else {
2658 let mut stmt = self.conn.prepare(
2659 "SELECT id, project_path, key, value, category, created_at, updated_at
2660 FROM project_memory WHERE project_path = ?1
2661 ORDER BY key ASC",
2662 )?;
2663 stmt.query_map(rusqlite::params![project_path], map_row)?
2664 .collect::<std::result::Result<Vec<_>, _>>()
2665 };
2666
2667 rows.map_err(Error::from)
2668 }
2669
2670 pub fn delete_memory(
2676 &mut self,
2677 project_path: &str,
2678 key: &str,
2679 actor: &str,
2680 ) -> Result<()> {
2681 let proj_path = project_path.to_string();
2682 self.mutate("delete_memory", actor, |tx, ctx| {
2683 let id: Option<String> = tx
2685 .query_row(
2686 "SELECT id FROM project_memory WHERE project_path = ?1 AND key = ?2",
2687 rusqlite::params![proj_path, key],
2688 |row| row.get(0),
2689 )
2690 .optional()?;
2691
2692 let rows = tx.execute(
2693 "DELETE FROM project_memory WHERE project_path = ?1 AND key = ?2",
2694 rusqlite::params![proj_path, key],
2695 )?;
2696
2697 if rows > 0 {
2698 if let Some(ref mem_id) = id {
2699 ctx.record_event("memory", mem_id, EventType::MemoryDeleted);
2700
2701 let now = chrono::Utc::now().timestamp_millis();
2703 tx.execute(
2704 "INSERT INTO sync_deletions (entity_type, entity_id, project_path, deleted_at, deleted_by, exported)
2705 VALUES ('memory', ?1, ?2, ?3, ?4, 0)
2706 ON CONFLICT(entity_type, entity_id) DO UPDATE SET
2707 deleted_at = excluded.deleted_at,
2708 deleted_by = excluded.deleted_by,
2709 exported = 0",
2710 rusqlite::params![mem_id, proj_path, now, ctx.actor],
2711 )?;
2712 }
2713 }
2714
2715 Ok(())
2716 })
2717 }
2718
2719 pub fn get_dirty_sessions(&self) -> Result<Vec<String>> {
2729 let mut stmt = self.conn.prepare(
2730 "SELECT session_id FROM dirty_sessions ORDER BY marked_at ASC",
2731 )?;
2732 let rows = stmt.query_map([], |row| row.get(0))?;
2733 rows.collect::<std::result::Result<Vec<_>, _>>()
2734 .map_err(Error::from)
2735 }
2736
2737 pub fn get_dirty_issues(&self) -> Result<Vec<String>> {
2743 let mut stmt = self.conn.prepare(
2744 "SELECT issue_id FROM dirty_issues ORDER BY marked_at ASC",
2745 )?;
2746 let rows = stmt.query_map([], |row| row.get(0))?;
2747 rows.collect::<std::result::Result<Vec<_>, _>>()
2748 .map_err(Error::from)
2749 }
2750
2751 pub fn get_dirty_context_items(&self) -> Result<Vec<String>> {
2757 let mut stmt = self.conn.prepare(
2758 "SELECT item_id FROM dirty_context_items ORDER BY marked_at ASC",
2759 )?;
2760 let rows = stmt.query_map([], |row| row.get(0))?;
2761 rows.collect::<std::result::Result<Vec<_>, _>>()
2762 .map_err(Error::from)
2763 }
2764
2765 pub fn clear_dirty_sessions(&mut self, ids: &[String]) -> Result<()> {
2771 if ids.is_empty() {
2772 return Ok(());
2773 }
2774 let placeholders = vec!["?"; ids.len()].join(",");
2775 let sql = format!("DELETE FROM dirty_sessions WHERE session_id IN ({placeholders})");
2776 let params: Vec<&dyn rusqlite::ToSql> = ids.iter().map(|s| s as &dyn rusqlite::ToSql).collect();
2777 self.conn.execute(&sql, params.as_slice())?;
2778 Ok(())
2779 }
2780
2781 pub fn clear_dirty_issues(&mut self, ids: &[String]) -> Result<()> {
2787 if ids.is_empty() {
2788 return Ok(());
2789 }
2790 let placeholders = vec!["?"; ids.len()].join(",");
2791 let sql = format!("DELETE FROM dirty_issues WHERE issue_id IN ({placeholders})");
2792 let params: Vec<&dyn rusqlite::ToSql> = ids.iter().map(|s| s as &dyn rusqlite::ToSql).collect();
2793 self.conn.execute(&sql, params.as_slice())?;
2794 Ok(())
2795 }
2796
2797 pub fn clear_dirty_context_items(&mut self, ids: &[String]) -> Result<()> {
2803 if ids.is_empty() {
2804 return Ok(());
2805 }
2806 let placeholders = vec!["?"; ids.len()].join(",");
2807 let sql = format!("DELETE FROM dirty_context_items WHERE item_id IN ({placeholders})");
2808 let params: Vec<&dyn rusqlite::ToSql> = ids.iter().map(|s| s as &dyn rusqlite::ToSql).collect();
2809 self.conn.execute(&sql, params.as_slice())?;
2810 Ok(())
2811 }
2812
2813 pub fn get_export_hash(&self, entity_type: &str, entity_id: &str) -> Result<Option<String>> {
2819 let mut stmt = self.conn.prepare(
2820 "SELECT content_hash FROM export_hashes WHERE entity_type = ?1 AND entity_id = ?2",
2821 )?;
2822 let hash = stmt
2823 .query_row(rusqlite::params![entity_type, entity_id], |row| row.get(0))
2824 .optional()?;
2825 Ok(hash)
2826 }
2827
2828 pub fn set_export_hash(&mut self, entity_type: &str, entity_id: &str, hash: &str) -> Result<()> {
2834 let now = chrono::Utc::now().timestamp_millis();
2835 self.conn.execute(
2836 "INSERT INTO export_hashes (entity_type, entity_id, content_hash, exported_at)
2837 VALUES (?1, ?2, ?3, ?4)
2838 ON CONFLICT(entity_type, entity_id) DO UPDATE SET
2839 content_hash = excluded.content_hash,
2840 exported_at = excluded.exported_at",
2841 rusqlite::params![entity_type, entity_id, hash, now],
2842 )?;
2843 Ok(())
2844 }
2845
2846 pub fn record_deletion(
2859 &mut self,
2860 entity_type: &str,
2861 entity_id: &str,
2862 project_path: &str,
2863 actor: &str,
2864 ) -> Result<()> {
2865 let now = chrono::Utc::now().timestamp_millis();
2866 self.conn.execute(
2867 "INSERT INTO sync_deletions (entity_type, entity_id, project_path, deleted_at, deleted_by, exported)
2868 VALUES (?1, ?2, ?3, ?4, ?5, 0)
2869 ON CONFLICT(entity_type, entity_id) DO UPDATE SET
2870 deleted_at = excluded.deleted_at,
2871 deleted_by = excluded.deleted_by,
2872 exported = 0",
2873 rusqlite::params![entity_type, entity_id, project_path, now, actor],
2874 )?;
2875 Ok(())
2876 }
2877
2878 pub fn get_pending_deletions(&self, project_path: &str) -> Result<Vec<SyncDeletion>> {
2884 let mut stmt = self.conn.prepare(
2885 "SELECT id, entity_type, entity_id, project_path, deleted_at, deleted_by
2886 FROM sync_deletions
2887 WHERE project_path = ?1 AND exported = 0
2888 ORDER BY deleted_at ASC",
2889 )?;
2890 let rows = stmt.query_map([project_path], |row| {
2891 Ok(SyncDeletion {
2892 id: row.get(0)?,
2893 entity_type: row.get(1)?,
2894 entity_id: row.get(2)?,
2895 project_path: row.get(3)?,
2896 deleted_at: row.get(4)?,
2897 deleted_by: row.get(5)?,
2898 })
2899 })?;
2900 rows.collect::<std::result::Result<Vec<_>, _>>()
2901 .map_err(Error::from)
2902 }
2903
2904 pub fn get_all_deletions(&self, project_path: &str) -> Result<Vec<SyncDeletion>> {
2910 let mut stmt = self.conn.prepare(
2911 "SELECT id, entity_type, entity_id, project_path, deleted_at, deleted_by
2912 FROM sync_deletions
2913 WHERE project_path = ?1
2914 ORDER BY deleted_at ASC",
2915 )?;
2916 let rows = stmt.query_map([project_path], |row| {
2917 Ok(SyncDeletion {
2918 id: row.get(0)?,
2919 entity_type: row.get(1)?,
2920 entity_id: row.get(2)?,
2921 project_path: row.get(3)?,
2922 deleted_at: row.get(4)?,
2923 deleted_by: row.get(5)?,
2924 })
2925 })?;
2926 rows.collect::<std::result::Result<Vec<_>, _>>()
2927 .map_err(Error::from)
2928 }
2929
2930 pub fn mark_deletions_exported(&mut self, ids: &[i64]) -> Result<()> {
2936 if ids.is_empty() {
2937 return Ok(());
2938 }
2939 let placeholders = vec!["?"; ids.len()].join(",");
2940 let sql = format!("UPDATE sync_deletions SET exported = 1 WHERE id IN ({placeholders})");
2941 let params: Vec<&dyn rusqlite::ToSql> = ids.iter().map(|s| s as &dyn rusqlite::ToSql).collect();
2942 self.conn.execute(&sql, params.as_slice())?;
2943 Ok(())
2944 }
2945
2946 pub fn count_pending_deletions(&self, project_path: &str) -> Result<usize> {
2952 let count: i64 = self.conn.query_row(
2953 "SELECT COUNT(*) FROM sync_deletions WHERE project_path = ?1 AND exported = 0",
2954 [project_path],
2955 |row| row.get(0),
2956 )?;
2957 Ok(count as usize)
2958 }
2959
2960 pub fn apply_deletion(&mut self, entity_type: &str, entity_id: &str) -> Result<bool> {
2966 let sql = match entity_type {
2967 "session" => "DELETE FROM sessions WHERE id = ?1",
2968 "issue" => "DELETE FROM issues WHERE id = ?1",
2969 "context_item" => "DELETE FROM context_items WHERE id = ?1",
2970 "memory" => "DELETE FROM project_memory WHERE id = ?1",
2971 "checkpoint" => "DELETE FROM checkpoints WHERE id = ?1",
2972 _ => return Ok(false),
2973 };
2974 let rows = self.conn.execute(sql, [entity_id])?;
2975 Ok(rows > 0)
2976 }
2977
2978 pub fn get_all_sessions(&self) -> Result<Vec<Session>> {
2984 let mut stmt = self.conn.prepare(
2985 "SELECT id, name, description, branch, channel, project_path, status, ended_at, created_at, updated_at
2986 FROM sessions ORDER BY created_at ASC",
2987 )?;
2988 let rows = stmt.query_map([], |row| {
2989 Ok(Session {
2990 id: row.get(0)?,
2991 name: row.get(1)?,
2992 description: row.get(2)?,
2993 branch: row.get(3)?,
2994 channel: row.get(4)?,
2995 project_path: row.get(5)?,
2996 status: row.get(6)?,
2997 ended_at: row.get(7)?,
2998 created_at: row.get(8)?,
2999 updated_at: row.get(9)?,
3000 })
3001 })?;
3002 rows.collect::<std::result::Result<Vec<_>, _>>()
3003 .map_err(Error::from)
3004 }
3005
3006 pub fn get_all_issues(&self) -> Result<Vec<Issue>> {
3012 let mut stmt = self.conn.prepare(
3013 "SELECT id, short_id, project_path, title, description, details, status, priority, issue_type, plan_id, created_by_agent, assigned_to_agent, created_at, updated_at, closed_at
3014 FROM issues ORDER BY created_at ASC",
3015 )?;
3016 let rows = stmt.query_map([], map_issue_row)?;
3017 rows.collect::<std::result::Result<Vec<_>, _>>()
3018 .map_err(Error::from)
3019 }
3020
3021 pub fn get_all_context_items(
3027 &self,
3028 category: Option<&str>,
3029 priority: Option<&str>,
3030 limit: Option<u32>,
3031 ) -> Result<Vec<ContextItem>> {
3032 let mut sql = String::from(
3033 "SELECT id, session_id, key, value, category, priority, channel, tags, size, created_at, updated_at
3034 FROM context_items WHERE 1=1",
3035 );
3036
3037 let mut params: Vec<Box<dyn rusqlite::ToSql>> = vec![];
3038
3039 if let Some(cat) = category {
3040 sql.push_str(" AND category = ?");
3041 params.push(Box::new(cat.to_string()));
3042 }
3043
3044 if let Some(pri) = priority {
3045 sql.push_str(" AND priority = ?");
3046 params.push(Box::new(pri.to_string()));
3047 }
3048
3049 sql.push_str(" ORDER BY created_at DESC");
3050 if let Some(lim) = limit {
3051 sql.push_str(" LIMIT ?");
3052 params.push(Box::new(lim));
3053 }
3054
3055 let mut stmt = self.conn.prepare(&sql)?;
3056 let params_refs: Vec<&dyn rusqlite::ToSql> = params.iter().map(|b| b.as_ref()).collect();
3057
3058 let rows = stmt.query_map(params_refs.as_slice(), |row| {
3059 Ok(ContextItem {
3060 id: row.get(0)?,
3061 session_id: row.get(1)?,
3062 key: row.get(2)?,
3063 value: row.get(3)?,
3064 category: row.get(4)?,
3065 priority: row.get(5)?,
3066 channel: row.get(6)?,
3067 tags: row.get(7)?,
3068 size: row.get(8)?,
3069 created_at: row.get(9)?,
3070 updated_at: row.get(10)?,
3071 })
3072 })?;
3073 rows.collect::<std::result::Result<Vec<_>, _>>()
3074 .map_err(Error::from)
3075 }
3076
3077 pub fn get_all_memory(&self) -> Result<Vec<Memory>> {
3083 let mut stmt = self.conn.prepare(
3084 "SELECT id, project_path, key, value, category, created_at, updated_at
3085 FROM project_memory ORDER BY created_at ASC",
3086 )?;
3087 let rows = stmt.query_map([], |row| {
3088 Ok(Memory {
3089 id: row.get(0)?,
3090 project_path: row.get(1)?,
3091 key: row.get(2)?,
3092 value: row.get(3)?,
3093 category: row.get(4)?,
3094 created_at: row.get(5)?,
3095 updated_at: row.get(6)?,
3096 })
3097 })?;
3098 rows.collect::<std::result::Result<Vec<_>, _>>()
3099 .map_err(Error::from)
3100 }
3101
3102 pub fn get_all_issue_short_ids(&self) -> Result<Vec<String>> {
3107 let mut stmt = self
3108 .conn
3109 .prepare("SELECT short_id FROM issues WHERE short_id IS NOT NULL")?;
3110 let rows = stmt.query_map([], |row| row.get::<_, String>(0))?;
3111 rows.collect::<std::result::Result<Vec<_>, _>>()
3112 .map_err(Error::from)
3113 }
3114
3115 pub fn get_all_session_ids(&self) -> Result<Vec<String>> {
3117 let mut stmt = self.conn.prepare("SELECT id FROM sessions")?;
3118 let rows = stmt.query_map([], |row| row.get::<_, String>(0))?;
3119 rows.collect::<std::result::Result<Vec<_>, _>>()
3120 .map_err(Error::from)
3121 }
3122
3123 pub fn get_all_checkpoint_ids(&self) -> Result<Vec<String>> {
3125 let mut stmt = self.conn.prepare("SELECT id FROM checkpoints")?;
3126 let rows = stmt.query_map([], |row| row.get::<_, String>(0))?;
3127 rows.collect::<std::result::Result<Vec<_>, _>>()
3128 .map_err(Error::from)
3129 }
3130
3131 pub fn get_all_checkpoints(&self) -> Result<Vec<Checkpoint>> {
3137 let mut stmt = self.conn.prepare(
3138 "SELECT c.id, c.session_id, c.name, c.description, c.git_status, c.git_branch, c.created_at,
3139 (SELECT COUNT(*) FROM checkpoint_items ci WHERE ci.checkpoint_id = c.id) as item_count
3140 FROM checkpoints c ORDER BY c.created_at ASC",
3141 )?;
3142 let rows = stmt.query_map([], |row| {
3143 Ok(Checkpoint {
3144 id: row.get(0)?,
3145 session_id: row.get(1)?,
3146 name: row.get(2)?,
3147 description: row.get(3)?,
3148 git_status: row.get(4)?,
3149 git_branch: row.get(5)?,
3150 created_at: row.get(6)?,
3151 item_count: row.get(7)?,
3152 })
3153 })?;
3154 rows.collect::<std::result::Result<Vec<_>, _>>()
3155 .map_err(Error::from)
3156 }
3157
3158 pub fn get_context_item(&self, id: &str) -> Result<Option<ContextItem>> {
3164 let mut stmt = self.conn.prepare(
3165 "SELECT id, session_id, key, value, category, priority, channel, tags, size, created_at, updated_at
3166 FROM context_items WHERE id = ?1",
3167 )?;
3168 let item = stmt
3169 .query_row([id], |row| {
3170 Ok(ContextItem {
3171 id: row.get(0)?,
3172 session_id: row.get(1)?,
3173 key: row.get(2)?,
3174 value: row.get(3)?,
3175 category: row.get(4)?,
3176 priority: row.get(5)?,
3177 channel: row.get(6)?,
3178 tags: row.get(7)?,
3179 size: row.get(8)?,
3180 created_at: row.get(9)?,
3181 updated_at: row.get(10)?,
3182 })
3183 })
3184 .optional()?;
3185 Ok(item)
3186 }
3187
3188 pub fn get_sessions_by_project(&self, project_path: &str) -> Result<Vec<Session>> {
3198 let mut stmt = self.conn.prepare(
3199 "SELECT id, name, description, branch, channel, project_path, status, ended_at, created_at, updated_at
3200 FROM sessions WHERE project_path = ?1 ORDER BY created_at ASC",
3201 )?;
3202 let rows = stmt.query_map([project_path], |row| {
3203 Ok(Session {
3204 id: row.get(0)?,
3205 name: row.get(1)?,
3206 description: row.get(2)?,
3207 branch: row.get(3)?,
3208 channel: row.get(4)?,
3209 project_path: row.get(5)?,
3210 status: row.get(6)?,
3211 ended_at: row.get(7)?,
3212 created_at: row.get(8)?,
3213 updated_at: row.get(9)?,
3214 })
3215 })?;
3216 rows.collect::<std::result::Result<Vec<_>, _>>()
3217 .map_err(Error::from)
3218 }
3219
3220 pub fn get_issues_by_project(&self, project_path: &str) -> Result<Vec<Issue>> {
3226 let mut stmt = self.conn.prepare(
3227 "SELECT id, short_id, project_path, title, description, details, status, priority, issue_type, plan_id, created_by_agent, assigned_to_agent, created_at, updated_at, closed_at
3228 FROM issues WHERE project_path = ?1 ORDER BY created_at ASC",
3229 )?;
3230 let rows = stmt.query_map([project_path], map_issue_row)?;
3231 rows.collect::<std::result::Result<Vec<_>, _>>()
3232 .map_err(Error::from)
3233 }
3234
3235 pub fn get_context_items_by_project(&self, project_path: &str) -> Result<Vec<ContextItem>> {
3244 let mut stmt = self.conn.prepare(
3245 "SELECT ci.id, ci.session_id, ci.key, ci.value, ci.category, ci.priority, ci.channel, ci.tags, ci.size, ci.created_at, ci.updated_at
3246 FROM context_items ci
3247 INNER JOIN sessions s ON ci.session_id = s.id
3248 WHERE s.project_path = ?1
3249 ORDER BY ci.created_at ASC",
3250 )?;
3251 let rows = stmt.query_map([project_path], |row| {
3252 Ok(ContextItem {
3253 id: row.get(0)?,
3254 session_id: row.get(1)?,
3255 key: row.get(2)?,
3256 value: row.get(3)?,
3257 category: row.get(4)?,
3258 priority: row.get(5)?,
3259 channel: row.get(6)?,
3260 tags: row.get(7)?,
3261 size: row.get(8)?,
3262 created_at: row.get(9)?,
3263 updated_at: row.get(10)?,
3264 })
3265 })?;
3266 rows.collect::<std::result::Result<Vec<_>, _>>()
3267 .map_err(Error::from)
3268 }
3269
3270 pub fn get_memory_by_project(&self, project_path: &str) -> Result<Vec<Memory>> {
3276 let mut stmt = self.conn.prepare(
3277 "SELECT id, project_path, key, value, category, created_at, updated_at
3278 FROM project_memory WHERE project_path = ?1 ORDER BY created_at ASC",
3279 )?;
3280 let rows = stmt.query_map([project_path], |row| {
3281 Ok(Memory {
3282 id: row.get(0)?,
3283 project_path: row.get(1)?,
3284 key: row.get(2)?,
3285 value: row.get(3)?,
3286 category: row.get(4)?,
3287 created_at: row.get(5)?,
3288 updated_at: row.get(6)?,
3289 })
3290 })?;
3291 rows.collect::<std::result::Result<Vec<_>, _>>()
3292 .map_err(Error::from)
3293 }
3294
3295 pub fn get_checkpoints_by_project(&self, project_path: &str) -> Result<Vec<Checkpoint>> {
3304 let mut stmt = self.conn.prepare(
3305 "SELECT c.id, c.session_id, c.name, c.description, c.git_status, c.git_branch, c.created_at,
3306 (SELECT COUNT(*) FROM checkpoint_items ci WHERE ci.checkpoint_id = c.id) as item_count
3307 FROM checkpoints c
3308 INNER JOIN sessions s ON c.session_id = s.id
3309 WHERE s.project_path = ?1
3310 ORDER BY c.created_at ASC",
3311 )?;
3312 let rows = stmt.query_map([project_path], |row| {
3313 Ok(Checkpoint {
3314 id: row.get(0)?,
3315 session_id: row.get(1)?,
3316 name: row.get(2)?,
3317 description: row.get(3)?,
3318 git_status: row.get(4)?,
3319 git_branch: row.get(5)?,
3320 created_at: row.get(6)?,
3321 item_count: row.get(7)?,
3322 })
3323 })?;
3324 rows.collect::<std::result::Result<Vec<_>, _>>()
3325 .map_err(Error::from)
3326 }
3327
3328 pub fn get_dirty_sessions_by_project(&self, project_path: &str) -> Result<Vec<String>> {
3334 let mut stmt = self.conn.prepare(
3335 "SELECT ds.session_id
3336 FROM dirty_sessions ds
3337 INNER JOIN sessions s ON ds.session_id = s.id
3338 WHERE s.project_path = ?1",
3339 )?;
3340 let rows = stmt.query_map([project_path], |row| row.get(0))?;
3341 rows.collect::<std::result::Result<Vec<_>, _>>()
3342 .map_err(Error::from)
3343 }
3344
3345 pub fn get_dirty_issues_by_project(&self, project_path: &str) -> Result<Vec<String>> {
3351 let mut stmt = self.conn.prepare(
3352 "SELECT di.issue_id
3353 FROM dirty_issues di
3354 INNER JOIN issues i ON di.issue_id = i.id
3355 WHERE i.project_path = ?1",
3356 )?;
3357 let rows = stmt.query_map([project_path], |row| row.get(0))?;
3358 rows.collect::<std::result::Result<Vec<_>, _>>()
3359 .map_err(Error::from)
3360 }
3361
3362 pub fn get_dirty_context_items_by_project(&self, project_path: &str) -> Result<Vec<String>> {
3368 let mut stmt = self.conn.prepare(
3369 "SELECT dci.item_id
3370 FROM dirty_context_items dci
3371 INNER JOIN context_items ci ON dci.item_id = ci.id
3372 INNER JOIN sessions s ON ci.session_id = s.id
3373 WHERE s.project_path = ?1",
3374 )?;
3375 let rows = stmt.query_map([project_path], |row| row.get(0))?;
3376 rows.collect::<std::result::Result<Vec<_>, _>>()
3377 .map_err(Error::from)
3378 }
3379
3380 pub fn backfill_dirty_for_project(&mut self, project_path: &str) -> Result<BackfillStats> {
3390 let now = chrono::Utc::now().timestamp_millis();
3391
3392 let sessions_count = self.conn.execute(
3394 "INSERT OR IGNORE INTO dirty_sessions (session_id, marked_at)
3395 SELECT id, ?1 FROM sessions WHERE project_path = ?2",
3396 rusqlite::params![now, project_path],
3397 )?;
3398
3399 let issues_count = self.conn.execute(
3401 "INSERT OR IGNORE INTO dirty_issues (issue_id, marked_at)
3402 SELECT id, ?1 FROM issues WHERE project_path = ?2",
3403 rusqlite::params![now, project_path],
3404 )?;
3405
3406 let context_items_count = self.conn.execute(
3408 "INSERT OR IGNORE INTO dirty_context_items (item_id, marked_at)
3409 SELECT ci.id, ?1 FROM context_items ci
3410 INNER JOIN sessions s ON ci.session_id = s.id
3411 WHERE s.project_path = ?2",
3412 rusqlite::params![now, project_path],
3413 )?;
3414
3415 let plans_count = self.conn.execute(
3417 "INSERT OR IGNORE INTO dirty_plans (plan_id, marked_at)
3418 SELECT id, ?1 FROM plans WHERE project_path = ?2",
3419 rusqlite::params![now, project_path],
3420 )?;
3421
3422 Ok(BackfillStats {
3423 sessions: sessions_count,
3424 issues: issues_count,
3425 context_items: context_items_count,
3426 plans: plans_count,
3427 })
3428 }
3429
3430 pub fn get_project_counts(&self, project_path: &str) -> Result<ProjectCounts> {
3436 let sessions: i64 = self.conn.query_row(
3437 "SELECT COUNT(*) FROM sessions WHERE project_path = ?1",
3438 [project_path],
3439 |row| row.get(0),
3440 )?;
3441
3442 let issues: i64 = self.conn.query_row(
3443 "SELECT COUNT(*) FROM issues WHERE project_path = ?1",
3444 [project_path],
3445 |row| row.get(0),
3446 )?;
3447
3448 let context_items: i64 = self.conn.query_row(
3449 "SELECT COUNT(*) FROM context_items ci
3450 INNER JOIN sessions s ON ci.session_id = s.id
3451 WHERE s.project_path = ?1",
3452 [project_path],
3453 |row| row.get(0),
3454 )?;
3455
3456 let memories: i64 = self.conn.query_row(
3457 "SELECT COUNT(*) FROM project_memory WHERE project_path = ?1",
3458 [project_path],
3459 |row| row.get(0),
3460 )?;
3461
3462 let checkpoints: i64 = self.conn.query_row(
3463 "SELECT COUNT(*) FROM checkpoints c
3464 INNER JOIN sessions s ON c.session_id = s.id
3465 WHERE s.project_path = ?1",
3466 [project_path],
3467 |row| row.get(0),
3468 )?;
3469
3470 Ok(ProjectCounts {
3471 sessions: sessions as usize,
3472 issues: issues as usize,
3473 context_items: context_items as usize,
3474 memories: memories as usize,
3475 checkpoints: checkpoints as usize,
3476 })
3477 }
3478
3479 pub fn upsert_session(&mut self, session: &Session) -> Result<()> {
3491 self.conn.execute(
3492 "INSERT INTO sessions (id, name, description, branch, channel, project_path, status, ended_at, created_at, updated_at)
3493 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10)
3494 ON CONFLICT(id) DO UPDATE SET
3495 name = excluded.name,
3496 description = excluded.description,
3497 branch = excluded.branch,
3498 channel = excluded.channel,
3499 project_path = excluded.project_path,
3500 status = excluded.status,
3501 ended_at = excluded.ended_at,
3502 updated_at = excluded.updated_at",
3503 rusqlite::params![
3504 session.id,
3505 session.name,
3506 session.description,
3507 session.branch,
3508 session.channel,
3509 session.project_path,
3510 session.status,
3511 session.ended_at,
3512 session.created_at,
3513 session.updated_at,
3514 ],
3515 )?;
3516 Ok(())
3517 }
3518
3519 pub fn upsert_issue(&mut self, issue: &Issue) -> Result<()> {
3525 self.conn.execute(
3526 "INSERT INTO issues (id, short_id, project_path, title, description, details, status, priority, issue_type, plan_id, created_by_agent, assigned_to_agent, created_at, updated_at, closed_at)
3527 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13, ?14, ?15)
3528 ON CONFLICT(id) DO UPDATE SET
3529 short_id = excluded.short_id,
3530 project_path = excluded.project_path,
3531 title = excluded.title,
3532 description = excluded.description,
3533 details = excluded.details,
3534 status = excluded.status,
3535 priority = excluded.priority,
3536 issue_type = excluded.issue_type,
3537 plan_id = excluded.plan_id,
3538 assigned_to_agent = excluded.assigned_to_agent,
3539 updated_at = excluded.updated_at,
3540 closed_at = excluded.closed_at",
3541 rusqlite::params![
3542 issue.id,
3543 issue.short_id,
3544 issue.project_path,
3545 issue.title,
3546 issue.description,
3547 issue.details,
3548 issue.status,
3549 issue.priority,
3550 issue.issue_type,
3551 issue.plan_id,
3552 issue.created_by_agent,
3553 issue.assigned_to_agent,
3554 issue.created_at,
3555 issue.updated_at,
3556 issue.closed_at,
3557 ],
3558 )?;
3559 Ok(())
3560 }
3561
3562 pub fn upsert_context_item(&mut self, item: &ContextItem) -> Result<()> {
3568 self.conn.execute(
3569 "INSERT INTO context_items (id, session_id, key, value, category, priority, channel, tags, size, created_at, updated_at)
3570 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11)
3571 ON CONFLICT(id) DO UPDATE SET
3572 key = excluded.key,
3573 value = excluded.value,
3574 category = excluded.category,
3575 priority = excluded.priority,
3576 channel = excluded.channel,
3577 tags = excluded.tags,
3578 size = excluded.size,
3579 updated_at = excluded.updated_at",
3580 rusqlite::params![
3581 item.id,
3582 item.session_id,
3583 item.key,
3584 item.value,
3585 item.category,
3586 item.priority,
3587 item.channel,
3588 item.tags,
3589 item.size,
3590 item.created_at,
3591 item.updated_at,
3592 ],
3593 )?;
3594 Ok(())
3595 }
3596
3597 pub fn upsert_memory(&mut self, memory: &Memory) -> Result<()> {
3603 self.conn.execute(
3604 "INSERT INTO project_memory (id, project_path, key, value, category, created_at, updated_at)
3605 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)
3606 ON CONFLICT(id) DO UPDATE SET
3607 key = excluded.key,
3608 value = excluded.value,
3609 category = excluded.category,
3610 updated_at = excluded.updated_at",
3611 rusqlite::params![
3612 memory.id,
3613 memory.project_path,
3614 memory.key,
3615 memory.value,
3616 memory.category,
3617 memory.created_at,
3618 memory.updated_at,
3619 ],
3620 )?;
3621 Ok(())
3622 }
3623
3624 pub fn upsert_checkpoint(&mut self, checkpoint: &Checkpoint) -> Result<()> {
3632 self.conn.execute(
3633 "INSERT INTO checkpoints (id, session_id, name, description, git_status, git_branch, created_at)
3634 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)
3635 ON CONFLICT(id) DO UPDATE SET
3636 name = excluded.name,
3637 description = excluded.description,
3638 git_status = excluded.git_status,
3639 git_branch = excluded.git_branch",
3640 rusqlite::params![
3641 checkpoint.id,
3642 checkpoint.session_id,
3643 checkpoint.name,
3644 checkpoint.description,
3645 checkpoint.git_status,
3646 checkpoint.git_branch,
3647 checkpoint.created_at,
3648 ],
3649 )?;
3650 Ok(())
3651 }
3652
3653 pub fn create_project(&mut self, project: &Project, actor: &str) -> Result<()> {
3663 self.mutate("create_project", actor, |tx, ctx| {
3664 tx.execute(
3665 "INSERT INTO projects (id, project_path, name, description, issue_prefix, next_issue_number, plan_prefix, next_plan_number, created_at, updated_at)
3666 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10)",
3667 rusqlite::params![
3668 project.id,
3669 project.project_path,
3670 project.name,
3671 project.description,
3672 project.issue_prefix,
3673 project.next_issue_number,
3674 project.plan_prefix,
3675 project.next_plan_number,
3676 project.created_at,
3677 project.updated_at,
3678 ],
3679 )?;
3680
3681 ctx.record_event("project", &project.id, EventType::ProjectCreated);
3682 Ok(())
3683 })
3684 }
3685
3686 pub fn get_project(&self, id: &str) -> Result<Option<Project>> {
3692 let project = self
3693 .conn
3694 .query_row(
3695 "SELECT id, project_path, name, description, issue_prefix, next_issue_number, plan_prefix, next_plan_number, created_at, updated_at
3696 FROM projects WHERE id = ?1",
3697 [id],
3698 map_project_row,
3699 )
3700 .optional()?;
3701 Ok(project)
3702 }
3703
3704 pub fn get_project_by_path(&self, project_path: &str) -> Result<Option<Project>> {
3710 let project = self
3711 .conn
3712 .query_row(
3713 "SELECT id, project_path, name, description, issue_prefix, next_issue_number, plan_prefix, next_plan_number, created_at, updated_at
3714 FROM projects WHERE project_path = ?1",
3715 [project_path],
3716 map_project_row,
3717 )
3718 .optional()?;
3719 Ok(project)
3720 }
3721
3722 pub fn list_projects(&self, limit: usize) -> Result<Vec<Project>> {
3728 let mut stmt = self.conn.prepare(
3729 "SELECT id, project_path, name, description, issue_prefix, next_issue_number, plan_prefix, next_plan_number, created_at, updated_at
3730 FROM projects
3731 ORDER BY updated_at DESC
3732 LIMIT ?1",
3733 )?;
3734
3735 let projects = stmt
3736 .query_map([limit], map_project_row)?
3737 .collect::<std::result::Result<Vec<_>, _>>()?;
3738
3739 Ok(projects)
3740 }
3741
3742 pub fn update_project(
3748 &mut self,
3749 id: &str,
3750 name: Option<&str>,
3751 description: Option<&str>,
3752 issue_prefix: Option<&str>,
3753 actor: &str,
3754 ) -> Result<()> {
3755 self.mutate("update_project", actor, |tx, ctx| {
3756 let now = chrono::Utc::now().timestamp_millis();
3757
3758 let mut updates = vec!["updated_at = ?1"];
3760 let mut params: Vec<Box<dyn rusqlite::ToSql>> = vec![Box::new(now)];
3761 let mut param_idx = 2;
3762
3763 if let Some(n) = name {
3764 updates.push(format!("name = ?{param_idx}").leak());
3765 params.push(Box::new(n.to_string()));
3766 param_idx += 1;
3767 }
3768
3769 if let Some(d) = description {
3770 updates.push(format!("description = ?{param_idx}").leak());
3771 params.push(Box::new(d.to_string()));
3772 param_idx += 1;
3773 }
3774
3775 if let Some(p) = issue_prefix {
3776 updates.push(format!("issue_prefix = ?{param_idx}").leak());
3777 params.push(Box::new(p.to_string()));
3778 param_idx += 1;
3779 }
3780
3781 params.push(Box::new(id.to_string()));
3783
3784 let sql = format!(
3785 "UPDATE projects SET {} WHERE id = ?{}",
3786 updates.join(", "),
3787 param_idx
3788 );
3789
3790 let param_refs: Vec<&dyn rusqlite::ToSql> = params.iter().map(|p| p.as_ref()).collect();
3791 let affected = tx.execute(&sql, param_refs.as_slice())?;
3792
3793 if affected == 0 {
3794 return Err(Error::ProjectNotFound { id: id.to_string() });
3795 }
3796
3797 ctx.record_event("project", id, EventType::ProjectUpdated);
3798 Ok(())
3799 })
3800 }
3801
3802 pub fn delete_project(&mut self, id: &str, actor: &str) -> Result<()> {
3814 self.mutate("delete_project", actor, |tx, ctx| {
3815 let project_path: Option<String> = tx
3817 .query_row(
3818 "SELECT project_path FROM projects WHERE id = ?1",
3819 [id],
3820 |row| row.get(0),
3821 )
3822 .optional()?;
3823
3824 let project_path = project_path.ok_or_else(|| Error::ProjectNotFound { id: id.to_string() })?;
3825
3826 tx.execute(
3828 "DELETE FROM sessions WHERE project_path = ?1",
3829 [&project_path],
3830 )?;
3831
3832 tx.execute(
3834 "DELETE FROM issues WHERE project_path = ?1",
3835 [&project_path],
3836 )?;
3837
3838 tx.execute(
3840 "DELETE FROM plans WHERE project_path = ?1",
3841 [&project_path],
3842 )?;
3843
3844 tx.execute(
3846 "DELETE FROM project_memory WHERE project_path = ?1",
3847 [&project_path],
3848 )?;
3849
3850 let affected = tx.execute("DELETE FROM projects WHERE id = ?1", [id])?;
3852
3853 if affected == 0 {
3854 return Err(Error::ProjectNotFound { id: id.to_string() });
3855 }
3856
3857 ctx.record_event("project", id, EventType::ProjectDeleted);
3858 Ok(())
3859 })
3860 }
3861
3862 pub fn get_or_create_project(&mut self, project_path: &str, actor: &str) -> Result<Project> {
3871 if let Some(project) = self.get_project_by_path(project_path)? {
3873 return Ok(project);
3874 }
3875
3876 let name = std::path::Path::new(project_path)
3878 .file_name()
3879 .and_then(|n| n.to_str())
3880 .unwrap_or("Unknown Project")
3881 .to_string();
3882
3883 let project = Project::new(project_path.to_string(), name);
3884 self.create_project(&project, actor)?;
3885 Ok(project)
3886 }
3887
3888 pub fn get_next_issue_number(&mut self, project_path: &str) -> Result<i32> {
3894 let project = self
3895 .get_project_by_path(project_path)?
3896 .ok_or_else(|| Error::ProjectNotFound { id: project_path.to_string() })?;
3897
3898 let next_num = project.next_issue_number;
3899
3900 self.conn.execute(
3902 "UPDATE projects SET next_issue_number = next_issue_number + 1, updated_at = ?1 WHERE project_path = ?2",
3903 rusqlite::params![chrono::Utc::now().timestamp_millis(), project_path],
3904 )?;
3905
3906 Ok(next_num)
3907 }
3908
3909 pub fn create_plan(&mut self, plan: &Plan, actor: &str) -> Result<()> {
3919 self.mutate("create_plan", actor, |tx, ctx| {
3920 tx.execute(
3921 "INSERT INTO plans (id, short_id, project_id, project_path, title, content, status, success_criteria, session_id, created_in_session, source_path, source_hash, created_at, updated_at)
3922 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13, ?14)",
3923 rusqlite::params![
3924 plan.id,
3925 plan.short_id,
3926 plan.project_id,
3927 plan.project_path,
3928 plan.title,
3929 plan.content,
3930 plan.status.as_str(),
3931 plan.success_criteria,
3932 plan.session_id,
3933 plan.created_in_session,
3934 plan.source_path,
3935 plan.source_hash,
3936 plan.created_at,
3937 plan.updated_at,
3938 ],
3939 )?;
3940
3941 ctx.record_event("plan", &plan.id, EventType::PlanCreated);
3942 Ok(())
3943 })
3944 }
3945
3946 pub fn get_plan(&self, id: &str) -> Result<Option<Plan>> {
3952 let plan = self
3953 .conn
3954 .query_row(
3955 "SELECT id, short_id, project_id, project_path, title, content, status, success_criteria, session_id, created_in_session, completed_in_session, source_path, source_hash, created_at, updated_at, completed_at
3956 FROM plans WHERE id = ?1",
3957 [id],
3958 map_plan_row,
3959 )
3960 .optional()?;
3961 Ok(plan)
3962 }
3963
3964 pub fn list_plans(&self, project_path: &str, status: Option<&str>, limit: usize) -> Result<Vec<Plan>> {
3970 let sql = if let Some(status) = status {
3971 if status == "all" {
3972 "SELECT id, short_id, project_id, project_path, title, content, status, success_criteria, session_id, created_in_session, completed_in_session, source_path, source_hash, created_at, updated_at, completed_at
3973 FROM plans WHERE project_path = ?1
3974 ORDER BY updated_at DESC
3975 LIMIT ?2".to_string()
3976 } else {
3977 format!(
3978 "SELECT id, short_id, project_id, project_path, title, content, status, success_criteria, session_id, created_in_session, completed_in_session, source_path, source_hash, created_at, updated_at, completed_at
3979 FROM plans WHERE project_path = ?1 AND status = '{}'
3980 ORDER BY updated_at DESC
3981 LIMIT ?2",
3982 status
3983 )
3984 }
3985 } else {
3986 "SELECT id, short_id, project_id, project_path, title, content, status, success_criteria, session_id, created_in_session, completed_in_session, source_path, source_hash, created_at, updated_at, completed_at
3988 FROM plans WHERE project_path = ?1 AND status = 'active'
3989 ORDER BY updated_at DESC
3990 LIMIT ?2".to_string()
3991 };
3992
3993 let mut stmt = self.conn.prepare(&sql)?;
3994 let plans = stmt
3995 .query_map(rusqlite::params![project_path, limit], map_plan_row)?
3996 .collect::<std::result::Result<Vec<_>, _>>()?;
3997
3998 Ok(plans)
3999 }
4000
4001 pub fn update_plan(
4007 &mut self,
4008 id: &str,
4009 title: Option<&str>,
4010 content: Option<&str>,
4011 status: Option<&str>,
4012 success_criteria: Option<&str>,
4013 actor: &str,
4014 ) -> Result<()> {
4015 self.mutate("update_plan", actor, |tx, ctx| {
4016 let now = chrono::Utc::now().timestamp_millis();
4017
4018 let mut updates = vec!["updated_at = ?1"];
4020 let mut params: Vec<Box<dyn rusqlite::ToSql>> = vec![Box::new(now)];
4021 let mut param_idx = 2;
4022
4023 if let Some(t) = title {
4024 updates.push(format!("title = ?{param_idx}").leak());
4025 params.push(Box::new(t.to_string()));
4026 param_idx += 1;
4027 }
4028
4029 if let Some(c) = content {
4030 updates.push(format!("content = ?{param_idx}").leak());
4031 params.push(Box::new(c.to_string()));
4032 param_idx += 1;
4033 }
4034
4035 if let Some(s) = status {
4036 updates.push(format!("status = ?{param_idx}").leak());
4037 params.push(Box::new(s.to_string()));
4038 param_idx += 1;
4039
4040 if s == "completed" {
4042 updates.push(format!("completed_at = ?{param_idx}").leak());
4043 params.push(Box::new(now));
4044 param_idx += 1;
4045 }
4046 }
4047
4048 if let Some(sc) = success_criteria {
4049 updates.push(format!("success_criteria = ?{param_idx}").leak());
4050 params.push(Box::new(sc.to_string()));
4051 param_idx += 1;
4052 }
4053
4054 params.push(Box::new(id.to_string()));
4056
4057 let sql = format!(
4058 "UPDATE plans SET {} WHERE id = ?{}",
4059 updates.join(", "),
4060 param_idx
4061 );
4062
4063 let param_refs: Vec<&dyn rusqlite::ToSql> = params.iter().map(|p| p.as_ref()).collect();
4064 let affected = tx.execute(&sql, param_refs.as_slice())?;
4065
4066 if affected == 0 {
4067 return Err(Error::Other(format!("Plan not found: {id}")));
4068 }
4069
4070 let event_type = if status == Some("completed") {
4071 EventType::PlanCompleted
4072 } else {
4073 EventType::PlanUpdated
4074 };
4075 ctx.record_event("plan", id, event_type);
4076 Ok(())
4077 })
4078 }
4079
4080 pub fn get_plans_by_project(&self, project_path: &str) -> Result<Vec<Plan>> {
4086 let mut stmt = self.conn.prepare(
4087 "SELECT id, short_id, project_id, project_path, title, content, status, success_criteria, session_id, created_in_session, completed_in_session, source_path, source_hash, created_at, updated_at, completed_at
4088 FROM plans WHERE project_path = ?1 ORDER BY created_at ASC",
4089 )?;
4090 let rows = stmt.query_map([project_path], map_plan_row)?;
4091 let plans: Vec<Plan> = rows.collect::<std::result::Result<_, _>>()?;
4092 Ok(plans)
4093 }
4094
4095 pub fn find_plan_by_source_hash(&self, source_hash: &str) -> Result<Option<Plan>> {
4101 let plan = self
4102 .conn
4103 .query_row(
4104 "SELECT id, short_id, project_id, project_path, title, content, status, success_criteria, session_id, created_in_session, completed_in_session, source_path, source_hash, created_at, updated_at, completed_at
4105 FROM plans WHERE source_hash = ?1 LIMIT 1",
4106 [source_hash],
4107 map_plan_row,
4108 )
4109 .optional()?;
4110 Ok(plan)
4111 }
4112
4113 pub fn upsert_plan(&mut self, plan: &Plan) -> Result<()> {
4119 self.conn.execute(
4120 "INSERT INTO plans (id, short_id, project_id, project_path, title, content, status, success_criteria, session_id, created_in_session, completed_in_session, source_path, source_hash, created_at, updated_at, completed_at)
4121 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13, ?14, ?15, ?16)
4122 ON CONFLICT(id) DO UPDATE SET
4123 short_id = excluded.short_id,
4124 title = excluded.title,
4125 content = excluded.content,
4126 status = excluded.status,
4127 success_criteria = excluded.success_criteria,
4128 session_id = excluded.session_id,
4129 source_path = excluded.source_path,
4130 source_hash = excluded.source_hash,
4131 updated_at = excluded.updated_at,
4132 completed_at = excluded.completed_at",
4133 rusqlite::params![
4134 plan.id,
4135 plan.short_id,
4136 plan.project_id,
4137 plan.project_path,
4138 plan.title,
4139 plan.content,
4140 plan.status.as_str(),
4141 plan.success_criteria,
4142 plan.session_id,
4143 plan.created_in_session,
4144 plan.completed_in_session,
4145 plan.source_path,
4146 plan.source_hash,
4147 plan.created_at,
4148 plan.updated_at,
4149 plan.completed_at,
4150 ],
4151 )?;
4152 Ok(())
4153 }
4154
4155 pub fn get_dirty_plans_by_project(&self, project_path: &str) -> Result<Vec<String>> {
4161 let mut stmt = self.conn.prepare(
4162 "SELECT dp.plan_id
4163 FROM dirty_plans dp
4164 INNER JOIN plans p ON dp.plan_id = p.id
4165 WHERE p.project_path = ?1",
4166 )?;
4167 let rows = stmt.query_map([project_path], |row| row.get(0))?;
4168 Ok(rows.collect::<std::result::Result<_, _>>()?)
4169 }
4170
4171 pub fn clear_dirty_plans(&mut self, ids: &[String]) -> Result<()> {
4177 if ids.is_empty() {
4178 return Ok(());
4179 }
4180 let placeholders = vec!["?"; ids.len()].join(",");
4181 let sql = format!("DELETE FROM dirty_plans WHERE plan_id IN ({placeholders})");
4182 let params: Vec<&dyn rusqlite::ToSql> = ids.iter().map(|s| s as &dyn rusqlite::ToSql).collect();
4183 self.conn.execute(&sql, params.as_slice())?;
4184 Ok(())
4185 }
4186
4187 pub fn store_embedding_chunk(
4200 &mut self,
4201 id: &str,
4202 item_id: &str,
4203 chunk_index: i32,
4204 chunk_text: &str,
4205 embedding: &[f32],
4206 provider: &str,
4207 model: &str,
4208 ) -> Result<()> {
4209 let now = chrono::Utc::now().timestamp_millis();
4210 let dimensions = embedding.len() as i32;
4211
4212 let blob: Vec<u8> = embedding
4214 .iter()
4215 .flat_map(|f| f.to_le_bytes())
4216 .collect();
4217
4218 self.conn.execute(
4219 "INSERT INTO embedding_chunks (id, item_id, chunk_index, chunk_text, embedding, dimensions, provider, model, created_at)
4220 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9)
4221 ON CONFLICT(item_id, chunk_index) DO UPDATE SET
4222 chunk_text = excluded.chunk_text,
4223 embedding = excluded.embedding,
4224 dimensions = excluded.dimensions,
4225 provider = excluded.provider,
4226 model = excluded.model,
4227 created_at = excluded.created_at",
4228 rusqlite::params![id, item_id, chunk_index, chunk_text, blob, dimensions, provider, model, now],
4229 )?;
4230
4231 self.conn.execute(
4233 "UPDATE context_items SET
4234 embedding_status = 'complete',
4235 embedding_provider = ?1,
4236 embedding_model = ?2,
4237 chunk_count = COALESCE(
4238 (SELECT MAX(chunk_index) + 1 FROM embedding_chunks WHERE item_id = ?3),
4239 1
4240 ),
4241 embedded_at = ?4
4242 WHERE id = ?3",
4243 rusqlite::params![provider, model, item_id, now],
4244 )?;
4245
4246 Ok(())
4247 }
4248
4249 pub fn get_embedding_chunks(&self, item_id: &str) -> Result<Vec<EmbeddingChunk>> {
4255 let mut stmt = self.conn.prepare(
4256 "SELECT id, item_id, chunk_index, chunk_text, embedding, dimensions, provider, model, created_at
4257 FROM embedding_chunks
4258 WHERE item_id = ?1
4259 ORDER BY chunk_index ASC",
4260 )?;
4261
4262 let rows = stmt.query_map([item_id], |row| {
4263 let blob: Vec<u8> = row.get(4)?;
4264 let dimensions: i32 = row.get(5)?;
4265
4266 let embedding: Vec<f32> = blob
4268 .chunks_exact(4)
4269 .map(|bytes| f32::from_le_bytes([bytes[0], bytes[1], bytes[2], bytes[3]]))
4270 .collect();
4271
4272 Ok(EmbeddingChunk {
4273 id: row.get(0)?,
4274 item_id: row.get(1)?,
4275 chunk_index: row.get(2)?,
4276 chunk_text: row.get(3)?,
4277 embedding,
4278 dimensions: dimensions as usize,
4279 provider: row.get(6)?,
4280 model: row.get(7)?,
4281 created_at: row.get(8)?,
4282 })
4283 })?;
4284
4285 rows.collect::<std::result::Result<Vec<_>, _>>()
4286 .map_err(Error::from)
4287 }
4288
4289 pub fn get_items_without_embeddings(
4295 &self,
4296 session_id: Option<&str>,
4297 limit: Option<u32>,
4298 ) -> Result<Vec<ContextItem>> {
4299 let limit = limit.unwrap_or(100);
4300
4301 let sql = if let Some(sid) = session_id {
4302 format!(
4303 "SELECT id, session_id, key, value, category, priority, channel, tags, size, created_at, updated_at
4304 FROM context_items
4305 WHERE session_id = '{}' AND (embedding_status IS NULL OR embedding_status IN ('none', 'pending', 'error'))
4306 ORDER BY created_at DESC
4307 LIMIT {}",
4308 sid, limit
4309 )
4310 } else {
4311 format!(
4312 "SELECT id, session_id, key, value, category, priority, channel, tags, size, created_at, updated_at
4313 FROM context_items
4314 WHERE embedding_status IS NULL OR embedding_status IN ('none', 'pending', 'error')
4315 ORDER BY created_at DESC
4316 LIMIT {}",
4317 limit
4318 )
4319 };
4320
4321 let mut stmt = self.conn.prepare(&sql)?;
4322 let rows = stmt.query_map([], |row| {
4323 Ok(ContextItem {
4324 id: row.get(0)?,
4325 session_id: row.get(1)?,
4326 key: row.get(2)?,
4327 value: row.get(3)?,
4328 category: row.get(4)?,
4329 priority: row.get(5)?,
4330 channel: row.get(6)?,
4331 tags: row.get(7)?,
4332 size: row.get(8)?,
4333 created_at: row.get(9)?,
4334 updated_at: row.get(10)?,
4335 })
4336 })?;
4337
4338 rows.collect::<std::result::Result<Vec<_>, _>>()
4339 .map_err(Error::from)
4340 }
4341
4342 pub fn count_embedding_status(&self, session_id: Option<&str>) -> Result<EmbeddingStats> {
4348 let (with_embeddings, without_embeddings) = if let Some(sid) = session_id {
4349 let with: i64 = self.conn.query_row(
4350 "SELECT COUNT(*) FROM context_items WHERE session_id = ?1 AND embedding_status = 'complete'",
4351 [sid],
4352 |row| row.get(0),
4353 )?;
4354 let without: i64 = self.conn.query_row(
4355 "SELECT COUNT(*) FROM context_items WHERE session_id = ?1 AND (embedding_status IS NULL OR embedding_status IN ('none', 'pending', 'error'))",
4356 [sid],
4357 |row| row.get(0),
4358 )?;
4359 (with, without)
4360 } else {
4361 let with: i64 = self.conn.query_row(
4362 "SELECT COUNT(*) FROM context_items WHERE embedding_status = 'complete'",
4363 [],
4364 |row| row.get(0),
4365 )?;
4366 let without: i64 = self.conn.query_row(
4367 "SELECT COUNT(*) FROM context_items WHERE embedding_status IS NULL OR embedding_status IN ('none', 'pending', 'error')",
4368 [],
4369 |row| row.get(0),
4370 )?;
4371 (with, without)
4372 };
4373
4374 Ok(EmbeddingStats {
4375 with_embeddings: with_embeddings as usize,
4376 without_embeddings: without_embeddings as usize,
4377 })
4378 }
4379
4380 pub fn resync_embedding_status(&self) -> Result<usize> {
4392 let count = self.conn.execute(
4393 "UPDATE context_items SET embedding_status = 'pending'
4394 WHERE embedding_status = 'complete'
4395 AND id NOT IN (SELECT DISTINCT item_id FROM embedding_chunks)",
4396 [],
4397 )?;
4398 Ok(count)
4399 }
4400
4401 pub fn semantic_search(
4411 &self,
4412 query_embedding: &[f32],
4413 session_id: Option<&str>,
4414 limit: usize,
4415 threshold: f32,
4416 ) -> Result<Vec<SemanticSearchResult>> {
4417 let sql = if let Some(sid) = session_id {
4419 format!(
4420 "SELECT ec.id, ec.item_id, ec.chunk_index, ec.chunk_text, ec.embedding, ec.dimensions,
4421 ci.key, ci.value, ci.category, ci.priority
4422 FROM embedding_chunks ec
4423 INNER JOIN context_items ci ON ec.item_id = ci.id
4424 WHERE ci.session_id = '{}'",
4425 sid
4426 )
4427 } else {
4428 "SELECT ec.id, ec.item_id, ec.chunk_index, ec.chunk_text, ec.embedding, ec.dimensions,
4429 ci.key, ci.value, ci.category, ci.priority
4430 FROM embedding_chunks ec
4431 INNER JOIN context_items ci ON ec.item_id = ci.id".to_string()
4432 };
4433
4434 let mut stmt = self.conn.prepare(&sql)?;
4435 let rows = stmt.query_map([], |row| {
4436 let blob: Vec<u8> = row.get(4)?;
4437 let embedding: Vec<f32> = blob
4438 .chunks_exact(4)
4439 .map(|bytes| f32::from_le_bytes([bytes[0], bytes[1], bytes[2], bytes[3]]))
4440 .collect();
4441
4442 Ok((
4443 row.get::<_, String>(1)?, row.get::<_, i32>(2)?, row.get::<_, String>(3)?, embedding,
4447 row.get::<_, String>(6)?, row.get::<_, String>(7)?, row.get::<_, String>(8)?, row.get::<_, String>(9)?, ))
4452 })?;
4453
4454 let mut results: Vec<SemanticSearchResult> = rows
4456 .filter_map(|row| row.ok())
4457 .map(|(item_id, chunk_index, chunk_text, embedding, key, value, category, priority)| {
4458 let similarity = cosine_similarity(query_embedding, &embedding);
4459 SemanticSearchResult {
4460 item_id,
4461 chunk_index,
4462 chunk_text,
4463 similarity,
4464 key,
4465 value,
4466 category,
4467 priority,
4468 }
4469 })
4470 .filter(|r| r.similarity >= threshold)
4471 .collect();
4472
4473 results.sort_by(|a, b| b.similarity.partial_cmp(&a.similarity).unwrap_or(std::cmp::Ordering::Equal));
4475
4476 let mut seen_items = std::collections::HashSet::new();
4478 let deduped: Vec<SemanticSearchResult> = results
4479 .into_iter()
4480 .filter(|r| seen_items.insert(r.item_id.clone()))
4481 .take(limit)
4482 .collect();
4483
4484 Ok(deduped)
4485 }
4486
4487 pub fn delete_embeddings(&mut self, item_id: &str) -> Result<()> {
4493 self.conn.execute(
4494 "DELETE FROM embedding_chunks WHERE item_id = ?1",
4495 [item_id],
4496 )?;
4497
4498 self.conn.execute(
4499 "UPDATE context_items SET
4500 embedding_status = 'none',
4501 embedding_provider = NULL,
4502 embedding_model = NULL,
4503 chunk_count = 0,
4504 embedded_at = NULL
4505 WHERE id = ?1",
4506 [item_id],
4507 )?;
4508
4509 Ok(())
4510 }
4511
4512 pub fn get_embedding_meta(&self, key: &str) -> Result<Option<String>> {
4518 let value = self.conn.query_row(
4519 "SELECT value FROM embeddings_meta WHERE key = ?1",
4520 [key],
4521 |row| row.get(0),
4522 ).optional()?;
4523 Ok(value)
4524 }
4525
4526 pub fn set_embedding_meta(&mut self, key: &str, value: &str) -> Result<()> {
4532 let now = chrono::Utc::now().timestamp_millis();
4533 self.conn.execute(
4534 "INSERT INTO embeddings_meta (key, value, updated_at)
4535 VALUES (?1, ?2, ?3)
4536 ON CONFLICT(key) DO UPDATE SET
4537 value = excluded.value,
4538 updated_at = excluded.updated_at",
4539 rusqlite::params![key, value, now],
4540 )?;
4541 Ok(())
4542 }
4543
4544 pub fn store_fast_embedding_chunk(
4557 &mut self,
4558 id: &str,
4559 item_id: &str,
4560 chunk_index: i32,
4561 chunk_text: &str,
4562 embedding: &[f32],
4563 model: &str,
4564 ) -> Result<()> {
4565 let now = chrono::Utc::now().timestamp_millis();
4566 let dimensions = embedding.len() as i32;
4567
4568 let blob: Vec<u8> = embedding
4570 .iter()
4571 .flat_map(|f| f.to_le_bytes())
4572 .collect();
4573
4574 self.conn.execute(
4575 "INSERT INTO embedding_chunks_fast (id, item_id, chunk_index, chunk_text, embedding, dimensions, provider, model, created_at)
4576 VALUES (?1, ?2, ?3, ?4, ?5, ?6, 'model2vec', ?7, ?8)
4577 ON CONFLICT(item_id, chunk_index) DO UPDATE SET
4578 chunk_text = excluded.chunk_text,
4579 embedding = excluded.embedding,
4580 dimensions = excluded.dimensions,
4581 model = excluded.model,
4582 created_at = excluded.created_at",
4583 rusqlite::params![id, item_id, chunk_index, chunk_text, blob, dimensions, model, now],
4584 )?;
4585
4586 self.conn.execute(
4588 "UPDATE context_items SET
4589 fast_embedding_status = 'complete',
4590 fast_embedded_at = ?1
4591 WHERE id = ?2",
4592 rusqlite::params![now, item_id],
4593 )?;
4594
4595 Ok(())
4596 }
4597
4598 pub fn search_fast_tier(
4607 &self,
4608 query_embedding: &[f32],
4609 session_id: Option<&str>,
4610 limit: usize,
4611 threshold: f32,
4612 ) -> Result<Vec<SemanticSearchResult>> {
4613 let sql = if let Some(sid) = session_id {
4615 format!(
4616 "SELECT ec.id, ec.item_id, ec.chunk_index, ec.chunk_text, ec.embedding, ec.dimensions,
4617 ci.key, ci.value, ci.category, ci.priority
4618 FROM embedding_chunks_fast ec
4619 INNER JOIN context_items ci ON ec.item_id = ci.id
4620 WHERE ci.session_id = '{}'",
4621 sid
4622 )
4623 } else {
4624 "SELECT ec.id, ec.item_id, ec.chunk_index, ec.chunk_text, ec.embedding, ec.dimensions,
4625 ci.key, ci.value, ci.category, ci.priority
4626 FROM embedding_chunks_fast ec
4627 INNER JOIN context_items ci ON ec.item_id = ci.id".to_string()
4628 };
4629
4630 let mut stmt = self.conn.prepare(&sql)?;
4631 let rows = stmt.query_map([], |row| {
4632 let blob: Vec<u8> = row.get(4)?;
4633 let embedding: Vec<f32> = blob
4634 .chunks_exact(4)
4635 .map(|bytes| f32::from_le_bytes([bytes[0], bytes[1], bytes[2], bytes[3]]))
4636 .collect();
4637
4638 Ok((
4639 row.get::<_, String>(1)?, row.get::<_, i32>(2)?, row.get::<_, String>(3)?, embedding,
4643 row.get::<_, String>(6)?, row.get::<_, String>(7)?, row.get::<_, String>(8)?, row.get::<_, String>(9)?, ))
4648 })?;
4649
4650 let mut results: Vec<SemanticSearchResult> = rows
4652 .filter_map(|row| row.ok())
4653 .map(|(item_id, chunk_index, chunk_text, embedding, key, value, category, priority)| {
4654 let similarity = cosine_similarity(query_embedding, &embedding);
4655 SemanticSearchResult {
4656 item_id,
4657 chunk_index,
4658 chunk_text,
4659 similarity,
4660 key,
4661 value,
4662 category,
4663 priority,
4664 }
4665 })
4666 .filter(|r| r.similarity >= threshold)
4667 .collect();
4668
4669 results.sort_by(|a, b| b.similarity.partial_cmp(&a.similarity).unwrap_or(std::cmp::Ordering::Equal));
4671
4672 let mut seen_items = std::collections::HashSet::new();
4674 let deduped: Vec<SemanticSearchResult> = results
4675 .into_iter()
4676 .filter(|r| seen_items.insert(r.item_id.clone()))
4677 .take(limit)
4678 .collect();
4679
4680 Ok(deduped)
4681 }
4682
4683 pub fn get_items_needing_quality_upgrade(
4691 &self,
4692 session_id: Option<&str>,
4693 limit: Option<u32>,
4694 ) -> Result<Vec<ContextItem>> {
4695 let limit = limit.unwrap_or(100);
4696
4697 let sql = if let Some(sid) = session_id {
4698 format!(
4699 "SELECT id, session_id, key, value, category, priority, channel, tags, size, created_at, updated_at
4700 FROM context_items
4701 WHERE session_id = '{}'
4702 AND fast_embedding_status = 'complete'
4703 AND (embedding_status IS NULL OR embedding_status = 'none' OR embedding_status = 'pending')
4704 ORDER BY created_at DESC
4705 LIMIT {}",
4706 sid, limit
4707 )
4708 } else {
4709 format!(
4710 "SELECT id, session_id, key, value, category, priority, channel, tags, size, created_at, updated_at
4711 FROM context_items
4712 WHERE fast_embedding_status = 'complete'
4713 AND (embedding_status IS NULL OR embedding_status = 'none' OR embedding_status = 'pending')
4714 ORDER BY created_at DESC
4715 LIMIT {}",
4716 limit
4717 )
4718 };
4719
4720 let mut stmt = self.conn.prepare(&sql)?;
4721 let rows = stmt.query_map([], |row| {
4722 Ok(ContextItem {
4723 id: row.get(0)?,
4724 session_id: row.get(1)?,
4725 key: row.get(2)?,
4726 value: row.get(3)?,
4727 category: row.get(4)?,
4728 priority: row.get(5)?,
4729 channel: row.get(6)?,
4730 tags: row.get(7)?,
4731 size: row.get(8)?,
4732 created_at: row.get(9)?,
4733 updated_at: row.get(10)?,
4734 })
4735 })?;
4736
4737 rows.collect::<std::result::Result<Vec<_>, _>>()
4738 .map_err(Error::from)
4739 }
4740
4741 pub fn delete_fast_embeddings(&mut self, item_id: &str) -> Result<()> {
4747 self.conn.execute(
4748 "DELETE FROM embedding_chunks_fast WHERE item_id = ?1",
4749 [item_id],
4750 )?;
4751
4752 self.conn.execute(
4753 "UPDATE context_items SET
4754 fast_embedding_status = 'none',
4755 fast_embedded_at = NULL
4756 WHERE id = ?1",
4757 [item_id],
4758 )?;
4759
4760 Ok(())
4761 }
4762
4763 pub fn count_fast_embedding_status(&self, session_id: Option<&str>) -> Result<EmbeddingStats> {
4769 let (with_embeddings, without_embeddings) = if let Some(sid) = session_id {
4770 let with: i64 = self.conn.query_row(
4771 "SELECT COUNT(*) FROM context_items WHERE session_id = ?1 AND fast_embedding_status = 'complete'",
4772 [sid],
4773 |row| row.get(0),
4774 )?;
4775 let without: i64 = self.conn.query_row(
4776 "SELECT COUNT(*) FROM context_items WHERE session_id = ?1 AND (fast_embedding_status IS NULL OR fast_embedding_status = 'none')",
4777 [sid],
4778 |row| row.get(0),
4779 )?;
4780 (with, without)
4781 } else {
4782 let with: i64 = self.conn.query_row(
4783 "SELECT COUNT(*) FROM context_items WHERE fast_embedding_status = 'complete'",
4784 [],
4785 |row| row.get(0),
4786 )?;
4787 let without: i64 = self.conn.query_row(
4788 "SELECT COUNT(*) FROM context_items WHERE fast_embedding_status IS NULL OR fast_embedding_status = 'none'",
4789 [],
4790 |row| row.get(0),
4791 )?;
4792 (with, without)
4793 };
4794
4795 Ok(EmbeddingStats {
4796 with_embeddings: with_embeddings as usize,
4797 without_embeddings: without_embeddings as usize,
4798 })
4799 }
4800}
4801
4802fn map_plan_row(row: &rusqlite::Row) -> rusqlite::Result<Plan> {
4804 let status_str: String = row.get(6)?;
4805 Ok(Plan {
4806 id: row.get(0)?,
4807 short_id: row.get(1)?,
4808 project_id: row.get(2)?,
4809 project_path: row.get(3)?,
4810 title: row.get(4)?,
4811 content: row.get(5)?,
4812 status: PlanStatus::from_str(&status_str),
4813 success_criteria: row.get(7)?,
4814 session_id: row.get(8)?,
4815 created_in_session: row.get(9)?,
4816 completed_in_session: row.get(10)?,
4817 source_path: row.get(11)?,
4818 source_hash: row.get(12)?,
4819 created_at: row.get(13)?,
4820 updated_at: row.get(14)?,
4821 completed_at: row.get(15)?,
4822 })
4823}
4824
4825fn map_project_row(row: &rusqlite::Row) -> rusqlite::Result<Project> {
4827 Ok(Project {
4828 id: row.get(0)?,
4829 project_path: row.get(1)?,
4830 name: row.get(2)?,
4831 description: row.get(3)?,
4832 issue_prefix: row.get(4)?,
4833 next_issue_number: row.get(5)?,
4834 plan_prefix: row.get(6)?,
4835 next_plan_number: row.get(7)?,
4836 created_at: row.get(8)?,
4837 updated_at: row.get(9)?,
4838 })
4839}
4840
4841fn map_issue_row(row: &rusqlite::Row) -> rusqlite::Result<Issue> {
4843 Ok(Issue {
4844 id: row.get(0)?,
4845 short_id: row.get(1)?,
4846 project_path: row.get(2)?,
4847 title: row.get(3)?,
4848 description: row.get(4)?,
4849 details: row.get(5)?,
4850 status: row.get(6)?,
4851 priority: row.get(7)?,
4852 issue_type: row.get(8)?,
4853 plan_id: row.get(9)?,
4854 created_by_agent: row.get(10)?,
4855 assigned_to_agent: row.get(11)?,
4856 created_at: row.get(12)?,
4857 updated_at: row.get(13)?,
4858 closed_at: row.get(14)?,
4859 })
4860}
4861
4862#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
4868pub struct Session {
4869 pub id: String,
4870 pub name: String,
4871 pub description: Option<String>,
4872 pub branch: Option<String>,
4873 pub channel: Option<String>,
4874 pub project_path: Option<String>,
4875 pub status: String,
4876 pub ended_at: Option<i64>,
4877 pub created_at: i64,
4878 pub updated_at: i64,
4879}
4880
4881#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
4883pub struct ContextItem {
4884 pub id: String,
4885 pub session_id: String,
4886 pub key: String,
4887 pub value: String,
4888 pub category: String,
4889 pub priority: String,
4890 pub channel: Option<String>,
4891 pub tags: Option<String>,
4892 pub size: i64,
4893 pub created_at: i64,
4894 pub updated_at: i64,
4895}
4896
4897#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
4900pub struct Issue {
4901 pub id: String,
4902 pub short_id: Option<String>,
4903 pub project_path: String,
4904 pub title: String,
4905 pub description: Option<String>,
4906 pub details: Option<String>,
4907 pub status: String,
4908 pub priority: i32,
4909 pub issue_type: String,
4910 pub plan_id: Option<String>,
4911 pub created_by_agent: Option<String>,
4912 pub assigned_to_agent: Option<String>,
4913 pub created_at: i64,
4914 pub updated_at: i64,
4915 pub closed_at: Option<i64>,
4916}
4917
4918#[derive(Debug, Clone, Default, serde::Serialize, serde::Deserialize)]
4920pub struct EpicProgress {
4921 pub total: usize,
4922 pub closed: usize,
4923 pub in_progress: usize,
4924 pub open: usize,
4925 pub blocked: usize,
4926 pub deferred: usize,
4927}
4928
4929#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
4931pub struct Checkpoint {
4932 pub id: String,
4933 pub session_id: String,
4934 pub name: String,
4935 pub description: Option<String>,
4936 pub git_status: Option<String>,
4937 pub git_branch: Option<String>,
4938 pub created_at: i64,
4939 pub item_count: i64,
4940}
4941
4942#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
4944pub struct Memory {
4945 pub id: String,
4946 pub project_path: String,
4947 pub key: String,
4948 pub value: String,
4949 pub category: String,
4950 pub created_at: i64,
4951 pub updated_at: i64,
4952}
4953
4954#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
4956pub struct SyncDeletion {
4957 pub id: i64,
4959 pub entity_type: String,
4961 pub entity_id: String,
4963 pub project_path: String,
4965 pub deleted_at: i64,
4967 pub deleted_by: String,
4969}
4970
4971#[derive(Debug, Clone, serde::Serialize)]
4976pub struct EmbeddingChunk {
4977 pub id: String,
4979 pub item_id: String,
4981 pub chunk_index: i32,
4983 pub chunk_text: String,
4985 pub embedding: Vec<f32>,
4987 pub dimensions: usize,
4989 pub provider: String,
4991 pub model: String,
4993 pub created_at: i64,
4995}
4996
4997#[derive(Debug, Clone, serde::Serialize)]
4999pub struct EmbeddingStats {
5000 pub with_embeddings: usize,
5002 pub without_embeddings: usize,
5004}
5005
5006#[derive(Debug, Clone, serde::Serialize)]
5008pub struct SemanticSearchResult {
5009 pub item_id: String,
5011 pub chunk_index: i32,
5013 pub chunk_text: String,
5015 pub similarity: f32,
5017 pub key: String,
5019 pub value: String,
5021 pub category: String,
5023 pub priority: String,
5025}
5026
5027fn cosine_similarity(a: &[f32], b: &[f32]) -> f32 {
5037 if a.len() != b.len() || a.is_empty() {
5038 return 0.0;
5039 }
5040
5041 let mut dot_product = 0.0;
5042 let mut norm_a = 0.0;
5043 let mut norm_b = 0.0;
5044
5045 for (x, y) in a.iter().zip(b.iter()) {
5046 dot_product += x * y;
5047 norm_a += x * x;
5048 norm_b += y * y;
5049 }
5050
5051 let magnitude = (norm_a * norm_b).sqrt();
5052 if magnitude == 0.0 {
5053 0.0
5054 } else {
5055 dot_product / magnitude
5056 }
5057}
5058
5059fn generate_short_id() -> String {
5061 use std::time::{SystemTime, UNIX_EPOCH};
5062 let now = SystemTime::now()
5063 .duration_since(UNIX_EPOCH)
5064 .unwrap()
5065 .as_millis();
5066 format!("{:04x}", (now & 0xFFFF) as u16)
5067}
5068
5069#[cfg(test)]
5070mod tests {
5071 use super::*;
5072
5073 #[test]
5074 fn test_open_memory() {
5075 let storage = SqliteStorage::open_memory();
5076 assert!(storage.is_ok());
5077 }
5078
5079 #[test]
5080 fn test_session_crud() {
5081 let mut storage = SqliteStorage::open_memory().unwrap();
5082
5083 storage
5085 .create_session(
5086 "sess_1",
5087 "Test Session",
5088 Some("A test session"),
5089 Some("/test/project"),
5090 Some("main"),
5091 "test-actor",
5092 )
5093 .unwrap();
5094
5095 let session = storage.get_session("sess_1").unwrap();
5097 assert!(session.is_some());
5098 let session = session.unwrap();
5099 assert_eq!(session.name, "Test Session");
5100 assert_eq!(session.status, "active");
5101
5102 let sessions = storage
5104 .list_sessions(Some("/test/project"), None, None)
5105 .unwrap();
5106 assert_eq!(sessions.len(), 1);
5107
5108 storage
5110 .update_session_status("sess_1", "completed", "test-actor")
5111 .unwrap();
5112 let session = storage.get_session("sess_1").unwrap().unwrap();
5113 assert_eq!(session.status, "completed");
5114 assert!(session.ended_at.is_some());
5115 }
5116
5117 #[test]
5118 fn test_context_item_crud() {
5119 let mut storage = SqliteStorage::open_memory().unwrap();
5120
5121 storage
5123 .create_session("sess_1", "Test", None, None, None, "actor")
5124 .unwrap();
5125
5126 storage
5128 .save_context_item(
5129 "item_1",
5130 "sess_1",
5131 "test-key",
5132 "test value",
5133 Some("note"),
5134 Some("high"),
5135 "actor",
5136 )
5137 .unwrap();
5138
5139 let items = storage.get_context_items("sess_1", None, None, None).unwrap();
5141 assert_eq!(items.len(), 1);
5142 assert_eq!(items[0].key, "test-key");
5143 assert_eq!(items[0].priority, "high");
5144
5145 storage
5147 .save_context_item(
5148 "item_1",
5149 "sess_1",
5150 "test-key",
5151 "updated value",
5152 Some("decision"),
5153 None,
5154 "actor",
5155 )
5156 .unwrap();
5157
5158 let items = storage.get_context_items("sess_1", None, None, None).unwrap();
5159 assert_eq!(items.len(), 1);
5160 assert_eq!(items[0].value, "updated value");
5161
5162 storage
5164 .delete_context_item("sess_1", "test-key", "actor")
5165 .unwrap();
5166 let items = storage.get_context_items("sess_1", None, None, None).unwrap();
5167 assert_eq!(items.len(), 0);
5168 }
5169
5170 #[test]
5171 fn test_issue_crud() {
5172 let mut storage = SqliteStorage::open_memory().unwrap();
5173
5174 storage
5176 .create_issue(
5177 "issue_1",
5178 Some("TST-1"),
5179 "/test/project",
5180 "Test Issue",
5181 Some("Description"),
5182 None, Some("task"), Some(3), None, "actor",
5187 )
5188 .unwrap();
5189
5190 let issue = storage.get_issue("issue_1", None).unwrap();
5192 assert!(issue.is_some());
5193 let issue = issue.unwrap();
5194 assert_eq!(issue.title, "Test Issue");
5195 assert_eq!(issue.priority, 3);
5196
5197 let issue = storage
5199 .get_issue("TST-1", Some("/test/project"))
5200 .unwrap();
5201 assert!(issue.is_some());
5202
5203 let issues = storage
5205 .list_issues("/test/project", None, None, None)
5206 .unwrap();
5207 assert_eq!(issues.len(), 1);
5208
5209 storage.claim_issue("issue_1", "agent-1").unwrap();
5211 let issue = storage.get_issue("issue_1", None).unwrap().unwrap();
5212 assert_eq!(issue.assigned_to_agent, Some("agent-1".to_string()));
5213 assert_eq!(issue.status, "in_progress");
5214
5215 storage.release_issue("issue_1", "agent-1").unwrap();
5217 let issue = storage.get_issue("issue_1", None).unwrap().unwrap();
5218 assert!(issue.assigned_to_agent.is_none());
5219 assert_eq!(issue.status, "open");
5220
5221 storage
5223 .update_issue_status("issue_1", "closed", "actor")
5224 .unwrap();
5225 let issue = storage.get_issue("issue_1", None).unwrap().unwrap();
5226 assert_eq!(issue.status, "closed");
5227 assert!(issue.closed_at.is_some());
5228 }
5229
5230 #[test]
5233 fn test_get_items_without_embeddings_includes_pending() {
5234 let mut storage = SqliteStorage::open_memory().unwrap();
5235 storage
5236 .create_session("sess_1", "Test", None, None, None, "actor")
5237 .unwrap();
5238
5239 for (id, key, status) in [
5241 ("item_1", "none-status", "none"),
5242 ("item_2", "pending-status", "pending"),
5243 ("item_3", "error-status", "error"),
5244 ("item_4", "complete-status", "complete"),
5245 ] {
5246 storage
5247 .save_context_item(id, "sess_1", key, "test value", Some("note"), Some("normal"), "actor")
5248 .unwrap();
5249 storage.conn.execute(
5250 "UPDATE context_items SET embedding_status = ?1 WHERE id = ?2",
5251 rusqlite::params![status, id],
5252 ).unwrap();
5253 }
5254
5255 storage
5257 .save_context_item("item_5", "sess_1", "null-status", "test", Some("note"), Some("normal"), "actor")
5258 .unwrap();
5259 storage.conn.execute(
5260 "UPDATE context_items SET embedding_status = NULL WHERE id = 'item_5'",
5261 [],
5262 ).unwrap();
5263
5264 let items = storage.get_items_without_embeddings(None, None).unwrap();
5265 let keys: Vec<&str> = items.iter().map(|i| i.key.as_str()).collect();
5266
5267 assert!(keys.contains(&"none-status"), "missing 'none' status");
5269 assert!(keys.contains(&"pending-status"), "missing 'pending' status");
5270 assert!(keys.contains(&"error-status"), "missing 'error' status");
5271 assert!(keys.contains(&"null-status"), "missing NULL status");
5272
5273 assert!(!keys.contains(&"complete-status"), "'complete' should be excluded");
5275 assert_eq!(items.len(), 4);
5276 }
5277
5278 #[test]
5279 fn test_get_items_without_embeddings_session_filter() {
5280 let mut storage = SqliteStorage::open_memory().unwrap();
5281 storage.create_session("sess_1", "Session 1", None, None, None, "actor").unwrap();
5282 storage.create_session("sess_2", "Session 2", None, None, None, "actor").unwrap();
5283
5284 storage.save_context_item("item_1", "sess_1", "s1-item", "val", Some("note"), Some("normal"), "actor").unwrap();
5285 storage.save_context_item("item_2", "sess_2", "s2-item", "val", Some("note"), Some("normal"), "actor").unwrap();
5286
5287 storage.conn.execute("UPDATE context_items SET embedding_status = 'pending'", []).unwrap();
5289
5290 let s1_items = storage.get_items_without_embeddings(Some("sess_1"), None).unwrap();
5292 assert_eq!(s1_items.len(), 1);
5293 assert_eq!(s1_items[0].key, "s1-item");
5294
5295 let all_items = storage.get_items_without_embeddings(None, None).unwrap();
5297 assert_eq!(all_items.len(), 2);
5298 }
5299
5300 #[test]
5301 fn test_resync_embedding_status() {
5302 let mut storage = SqliteStorage::open_memory().unwrap();
5303 storage.create_session("sess_1", "Test", None, None, None, "actor").unwrap();
5304
5305 storage.save_context_item("item_1", "sess_1", "phantom", "val", Some("note"), Some("normal"), "actor").unwrap();
5307 storage.save_context_item("item_2", "sess_1", "real", "val", Some("note"), Some("normal"), "actor").unwrap();
5308 storage.save_context_item("item_3", "sess_1", "pending-already", "val", Some("note"), Some("normal"), "actor").unwrap();
5309
5310 storage.conn.execute("UPDATE context_items SET embedding_status = 'complete'", []).unwrap();
5312 storage.conn.execute("UPDATE context_items SET embedding_status = 'pending' WHERE id = 'item_3'", []).unwrap();
5314
5315 storage.conn.execute(
5317 "INSERT INTO embedding_chunks (id, item_id, chunk_index, chunk_text, embedding, dimensions, provider, model, created_at)
5318 VALUES ('ec_1', 'item_2', 0, 'test', X'00000000', 1, 'test', 'test-model', 1000)",
5319 [],
5320 ).unwrap();
5321
5322 let count = storage.resync_embedding_status().unwrap();
5324 assert_eq!(count, 1, "only item_1 should be reset (phantom complete)");
5325
5326 let status_1: String = storage.conn.query_row(
5328 "SELECT embedding_status FROM context_items WHERE id = 'item_1'", [], |r| r.get(0)
5329 ).unwrap();
5330 assert_eq!(status_1, "pending", "phantom complete should be reset");
5331
5332 let status_2: String = storage.conn.query_row(
5333 "SELECT embedding_status FROM context_items WHERE id = 'item_2'", [], |r| r.get(0)
5334 ).unwrap();
5335 assert_eq!(status_2, "complete", "real complete should be untouched");
5336
5337 let status_3: String = storage.conn.query_row(
5338 "SELECT embedding_status FROM context_items WHERE id = 'item_3'", [], |r| r.get(0)
5339 ).unwrap();
5340 assert_eq!(status_3, "pending", "already-pending should be untouched");
5341 }
5342}