Skip to main content

libgrite_core/store/
mod.rs

1use std::collections::HashSet;
2use std::fs::File;
3use std::path::Path;
4use std::time::{Duration, Instant};
5
6use fs2::FileExt;
7
8use crate::error::GriteError;
9use crate::types::context::{FileContext, ProjectContextEntry};
10use crate::types::event::IssueState;
11use crate::types::event::{DependencyType, Event, EventKind};
12use crate::types::ids::{EventId, IssueId};
13use crate::types::issue::Version;
14use crate::types::issue::{IssueProjection, IssueSummary};
15
16/// Default threshold for events since rebuild before recommending rebuild
17pub const DEFAULT_REBUILD_EVENTS_THRESHOLD: usize = 10000;
18
19/// Default threshold for days since rebuild before recommending rebuild
20pub const DEFAULT_REBUILD_DAYS_THRESHOLD: u32 = 7;
21
22/// Filter for listing issues
23#[derive(Debug, Default)]
24pub struct IssueFilter {
25    pub state: Option<IssueState>,
26    pub label: Option<String>,
27}
28
29/// Statistics about the database
30#[derive(Debug)]
31pub struct DbStats {
32    pub path: String,
33    pub size_bytes: u64,
34    pub event_count: usize,
35    pub issue_count: usize,
36    pub last_rebuild_ts: Option<u64>,
37    /// Events inserted since last rebuild
38    pub events_since_rebuild: usize,
39    /// Days since last rebuild
40    pub days_since_rebuild: Option<u32>,
41    /// Whether rebuild is recommended based on thresholds
42    pub rebuild_recommended: bool,
43}
44
45/// Statistics from a rebuild operation
46#[derive(Debug)]
47pub struct RebuildStats {
48    pub event_count: usize,
49    pub issue_count: usize,
50}
51
52/// A GriteStore with filesystem-level exclusive lock.
53///
54/// The lock is held for the lifetime of this struct and automatically
55/// released when dropped. This prevents multiple processes from opening
56/// the same sled database concurrently.
57pub struct LockedStore {
58    /// Lock file handle - flock released on drop
59    _lock_file: File,
60    /// The underlying store
61    store: GriteStore,
62}
63
64impl std::fmt::Debug for LockedStore {
65    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
66        f.debug_struct("LockedStore")
67            .field("store", &"GriteStore { ... }")
68            .finish()
69    }
70}
71
72impl LockedStore {
73    /// Get a reference to the inner GriteStore
74    pub fn inner(&self) -> &GriteStore {
75        &self.store
76    }
77}
78
79impl std::ops::Deref for LockedStore {
80    type Target = GriteStore;
81
82    fn deref(&self) -> &Self::Target {
83        &self.store
84    }
85}
86
87impl std::ops::DerefMut for LockedStore {
88    fn deref_mut(&mut self) -> &mut Self::Target {
89        &mut self.store
90    }
91}
92
93/// Main storage interface backed by sled
94pub struct GriteStore {
95    db: sled::Db,
96    events: sled::Tree,
97    issue_states: sled::Tree,
98    issue_events: sled::Tree,
99    label_index: sled::Tree,
100    metadata: sled::Tree,
101    dep_forward: sled::Tree,
102    dep_reverse: sled::Tree,
103    context_files: sled::Tree,
104    context_symbols: sled::Tree,
105    context_project: sled::Tree,
106}
107
108impl GriteStore {
109    /// Open or create a store at the given path
110    pub fn open(path: &Path) -> Result<Self, GriteError> {
111        let db = sled::open(path)?;
112        let events = db.open_tree("events")?;
113        let issue_states = db.open_tree("issue_states")?;
114        let issue_events = db.open_tree("issue_events")?;
115        let label_index = db.open_tree("label_index")?;
116        let metadata = db.open_tree("metadata")?;
117        let dep_forward = db.open_tree("dep_forward")?;
118        let dep_reverse = db.open_tree("dep_reverse")?;
119        let context_files = db.open_tree("context_files")?;
120        let context_symbols = db.open_tree("context_symbols")?;
121        let context_project = db.open_tree("context_project")?;
122
123        Ok(Self {
124            db,
125            events,
126            issue_states,
127            issue_events,
128            label_index,
129            metadata,
130            dep_forward,
131            dep_reverse,
132            context_files,
133            context_symbols,
134            context_project,
135        })
136    }
137
138    /// Open store with exclusive filesystem lock (non-blocking).
139    ///
140    /// Lock file is created at `<path>.lock` (e.g., `.git/grite/actors/<id>/sled.lock`).
141    /// Returns `GriteError::DbBusy` if another process holds the lock.
142    pub fn open_locked(path: &Path) -> Result<LockedStore, GriteError> {
143        let lock_path = path.with_extension("lock");
144
145        // Create/open lock file
146        let lock_file = File::create(&lock_path)?;
147
148        // Try to acquire exclusive lock (non-blocking)
149        lock_file.try_lock_exclusive().map_err(|e| {
150            GriteError::DbBusy(format!("Database locked by another process: {}", e))
151        })?;
152
153        // Now safe to open sled
154        let store = Self::open(path)?;
155
156        Ok(LockedStore {
157            _lock_file: lock_file,
158            store,
159        })
160    }
161
162    /// Open store with exclusive filesystem lock (blocking with timeout).
163    ///
164    /// Retries with exponential backoff until the lock is acquired or timeout is reached.
165    /// Returns `GriteError::DbBusy` if timeout expires before acquiring the lock.
166    pub fn open_locked_blocking(path: &Path, timeout: Duration) -> Result<LockedStore, GriteError> {
167        let lock_path = path.with_extension("lock");
168        let lock_file = File::create(&lock_path)?;
169
170        let start = Instant::now();
171        let mut delay = Duration::from_millis(10);
172
173        loop {
174            match lock_file.try_lock_exclusive() {
175                Ok(()) => break,
176                Err(_) if start.elapsed() < timeout => {
177                    std::thread::sleep(delay);
178                    delay = (delay * 2).min(Duration::from_millis(200));
179                }
180                Err(e) => {
181                    return Err(GriteError::DbBusy(format!(
182                        "Timeout waiting for database lock: {}",
183                        e
184                    )))
185                }
186            }
187        }
188
189        let store = Self::open(path)?;
190        Ok(LockedStore {
191            _lock_file: lock_file,
192            store,
193        })
194    }
195
196    /// Insert an event and update projections
197    pub fn insert_event(&self, event: &Event) -> Result<(), GriteError> {
198        // Store the event
199        let event_key = event_key(&event.event_id);
200        let event_json = serde_json::to_vec(event)?;
201        self.events.insert(&event_key, event_json)?;
202
203        // Index by issue
204        let issue_events_key = issue_events_key(&event.issue_id, event.ts_unix_ms, &event.event_id);
205        self.issue_events.insert(&issue_events_key, &[])?;
206
207        // Update projection
208        self.update_projection(event)?;
209
210        // Increment events_since_rebuild counter
211        self.increment_events_since_rebuild()?;
212
213        Ok(())
214    }
215
216    /// Increment the events_since_rebuild counter
217    fn increment_events_since_rebuild(&self) -> Result<(), GriteError> {
218        let current = self
219            .metadata
220            .get("events_since_rebuild")?
221            .map(|bytes| {
222                let arr: [u8; 8] = bytes.as_ref().try_into().unwrap_or([0; 8]);
223                u64::from_le_bytes(arr)
224            })
225            .unwrap_or(0);
226
227        let new_count = current + 1;
228        self.metadata
229            .insert("events_since_rebuild", &new_count.to_le_bytes())?;
230        Ok(())
231    }
232
233    /// Update the issue projection for an event
234    fn update_projection(&self, event: &Event) -> Result<(), GriteError> {
235        // Handle context events separately (they don't have issue projections)
236        match &event.kind {
237            EventKind::ContextUpdated {
238                path,
239                language,
240                symbols,
241                summary,
242                content_hash,
243            } => {
244                return self.update_file_context(
245                    event,
246                    path,
247                    language,
248                    symbols,
249                    summary,
250                    content_hash,
251                );
252            }
253            EventKind::ProjectContextUpdated { key, value } => {
254                return self.update_project_context(event, key, value);
255            }
256            _ => {}
257        }
258
259        let issue_key = issue_state_key(&event.issue_id);
260
261        let mut projection = match self.issue_states.get(&issue_key)? {
262            Some(bytes) => serde_json::from_slice(&bytes)?,
263            None => {
264                // Must be IssueCreated
265                IssueProjection::from_event(event)?
266            }
267        };
268
269        // Apply event if not IssueCreated (which created the projection)
270        if self.issue_states.get(&issue_key)?.is_some() {
271            projection.apply(event)?;
272        }
273
274        // Update label index
275        for label in &projection.labels {
276            let label_key = label_index_key(label, &event.issue_id);
277            self.label_index.insert(&label_key, &[])?;
278        }
279
280        // Update dependency indexes
281        match &event.kind {
282            EventKind::DependencyAdded { target, dep_type } => {
283                let fwd = dep_forward_key(&event.issue_id, target, dep_type);
284                self.dep_forward.insert(&fwd, &[])?;
285                let rev = dep_reverse_key(target, &event.issue_id, dep_type);
286                self.dep_reverse.insert(&rev, &[])?;
287            }
288            EventKind::DependencyRemoved { target, dep_type } => {
289                let fwd = dep_forward_key(&event.issue_id, target, dep_type);
290                self.dep_forward.remove(&fwd)?;
291                let rev = dep_reverse_key(target, &event.issue_id, dep_type);
292                self.dep_reverse.remove(&rev)?;
293            }
294            _ => {}
295        }
296
297        // Store updated projection
298        let proj_json = serde_json::to_vec(&projection)?;
299        self.issue_states.insert(&issue_key, proj_json)?;
300
301        Ok(())
302    }
303
304    /// Update file context (LWW per path)
305    fn update_file_context(
306        &self,
307        event: &Event,
308        path: &str,
309        language: &str,
310        symbols: &[crate::types::event::SymbolInfo],
311        summary: &str,
312        content_hash: &[u8; 32],
313    ) -> Result<(), GriteError> {
314        let file_key = context_file_key(path);
315        let new_version = Version::new(event.ts_unix_ms, event.actor, event.event_id);
316
317        let should_update = match self.context_files.get(&file_key)? {
318            Some(existing_bytes) => {
319                let existing: FileContext = serde_json::from_slice(&existing_bytes)?;
320                new_version.is_newer_than(&existing.version)
321            }
322            None => true,
323        };
324
325        if should_update {
326            // Remove old symbol index entries for this path
327            let sym_path_suffix = format!("/{}", path);
328            for result in self.context_symbols.iter() {
329                let (key, _) = result?;
330                if let Ok(key_str) = std::str::from_utf8(&key) {
331                    if key_str.ends_with(&sym_path_suffix) {
332                        self.context_symbols.remove(&key)?;
333                    }
334                }
335            }
336
337            let ctx = FileContext {
338                path: path.to_string(),
339                language: language.to_string(),
340                symbols: symbols.to_vec(),
341                summary: summary.to_string(),
342                content_hash: *content_hash,
343                version: new_version,
344            };
345
346            // Insert file context
347            self.context_files
348                .insert(&file_key, serde_json::to_vec(&ctx)?)?;
349
350            // Insert symbol index entries
351            for sym in symbols {
352                let sym_key = context_symbol_key(&sym.name, path);
353                self.context_symbols.insert(&sym_key, &[])?;
354            }
355        }
356
357        Ok(())
358    }
359
360    /// Update project context (LWW per key)
361    fn update_project_context(
362        &self,
363        event: &Event,
364        key: &str,
365        value: &str,
366    ) -> Result<(), GriteError> {
367        let proj_key = context_project_key(key);
368        let new_version = Version::new(event.ts_unix_ms, event.actor, event.event_id);
369
370        let should_update = match self.context_project.get(&proj_key)? {
371            Some(existing_bytes) => {
372                let existing: ProjectContextEntry = serde_json::from_slice(&existing_bytes)?;
373                new_version.is_newer_than(&existing.version)
374            }
375            None => true,
376        };
377
378        if should_update {
379            let entry = ProjectContextEntry {
380                value: value.to_string(),
381                version: new_version,
382            };
383            self.context_project
384                .insert(&proj_key, serde_json::to_vec(&entry)?)?;
385        }
386
387        Ok(())
388    }
389
390    /// Get an event by ID
391    pub fn get_event(&self, event_id: &EventId) -> Result<Option<Event>, GriteError> {
392        let key = event_key(event_id);
393        match self.events.get(&key)? {
394            Some(bytes) => Ok(Some(serde_json::from_slice(&bytes)?)),
395            None => Ok(None),
396        }
397    }
398
399    /// Resolve a hex prefix to a full issue ID
400    ///
401    /// If the prefix is already a full 32-char hex ID, parses it directly.
402    /// Otherwise, scans the issue_states tree for matching prefixes.
403    /// Returns an error if the prefix is ambiguous (matches multiple issues).
404    pub fn resolve_issue_id(&self, hex_prefix: &str) -> Result<IssueId, GriteError> {
405        use crate::types::ids::{hex_to_id, id_to_hex};
406
407        // Full ID — parse directly
408        if hex_prefix.len() == 32 {
409            return hex_to_id::<16>(hex_prefix).map_err(|e| GriteError::InvalidArgs(e.to_string()));
410        }
411
412        // Validate hex characters
413        if !hex_prefix.chars().all(|c| c.is_ascii_hexdigit()) {
414            return Err(GriteError::InvalidArgs(format!(
415                "invalid hex prefix: {}",
416                hex_prefix
417            )));
418        }
419
420        if hex_prefix.len() < 4 {
421            return Err(GriteError::InvalidArgs(
422                "issue ID prefix must be at least 4 characters".to_string(),
423            ));
424        }
425
426        // Decode the prefix bytes for sled scan_prefix
427        // Pad odd-length prefixes with 0 for byte-level prefix scanning
428        let prefix_lower = hex_prefix.to_ascii_lowercase();
429        let full_byte_len = prefix_lower.len() / 2;
430        let prefix_bytes = hex::decode(&prefix_lower[..full_byte_len * 2])
431            .map_err(|e| GriteError::InvalidArgs(format!("invalid hex: {}", e)))?;
432
433        let mut scan_key = Vec::with_capacity(12 + prefix_bytes.len());
434        scan_key.extend_from_slice(b"issue_state/");
435        scan_key.extend_from_slice(&prefix_bytes);
436
437        let mut matches = Vec::new();
438        for result in self.issue_states.scan_prefix(&scan_key) {
439            let (key, _) = result?;
440            if key.len() != 12 + 16 {
441                continue;
442            }
443            let mut id = [0u8; 16];
444            id.copy_from_slice(&key[12..]);
445            let hex = id_to_hex(&id);
446            // Check the full hex prefix (handles odd-length prefixes)
447            if hex.starts_with(&prefix_lower) {
448                matches.push(id);
449            }
450        }
451
452        match matches.len() {
453            0 => Err(GriteError::NotFound(format!(
454                "no issue matching prefix {}",
455                hex_prefix
456            ))),
457            1 => Ok(matches[0]),
458            n => Err(GriteError::InvalidArgs(format!(
459                "ambiguous prefix {} matches {} issues",
460                hex_prefix, n
461            ))),
462        }
463    }
464
465    /// Get an issue projection by ID
466    pub fn get_issue(&self, issue_id: &IssueId) -> Result<Option<IssueProjection>, GriteError> {
467        let key = issue_state_key(issue_id);
468        match self.issue_states.get(&key)? {
469            Some(bytes) => Ok(Some(serde_json::from_slice(&bytes)?)),
470            None => Ok(None),
471        }
472    }
473
474    /// List issues with optional filtering
475    pub fn list_issues(&self, filter: &IssueFilter) -> Result<Vec<IssueSummary>, GriteError> {
476        let mut summaries = Vec::new();
477
478        for result in self.issue_states.iter() {
479            let (_, value) = result?;
480            let proj: IssueProjection = serde_json::from_slice(&value)?;
481
482            // Apply filters
483            if let Some(state) = filter.state {
484                if proj.state != state {
485                    continue;
486                }
487            }
488            if let Some(ref label) = filter.label {
489                if !proj.labels.contains(label) {
490                    continue;
491                }
492            }
493
494            summaries.push(IssueSummary::from(&proj));
495        }
496
497        // Sort by creation time (oldest first)
498        summaries.sort_by_key(|s| s.created_ts);
499
500        Ok(summaries)
501    }
502
503    /// Get all events for an issue, sorted by (ts, actor, event_id)
504    pub fn get_issue_events(&self, issue_id: &IssueId) -> Result<Vec<Event>, GriteError> {
505        let prefix = issue_events_prefix(issue_id);
506        let mut events = Vec::new();
507
508        for result in self.issue_events.scan_prefix(&prefix) {
509            let (key, _) = result?;
510            // Extract event_id from key
511            let event_id = extract_event_id_from_issue_events_key(&key)?;
512            if let Some(event) = self.get_event(&event_id)? {
513                events.push(event);
514            }
515        }
516
517        // Sort by (ts, actor, event_id)
518        events.sort_by(|a, b| {
519            (a.ts_unix_ms, &a.actor, &a.event_id).cmp(&(b.ts_unix_ms, &b.actor, &b.event_id))
520        });
521
522        Ok(events)
523    }
524
525    /// Get all events in the store
526    pub fn get_all_events(&self) -> Result<Vec<Event>, GriteError> {
527        let mut events = Vec::new();
528        for result in self.events.iter() {
529            let (_, value) = result?;
530            let event: Event = serde_json::from_slice(&value)?;
531            events.push(event);
532        }
533        // Sort by (issue_id, ts, actor, event_id)
534        events.sort_by(|a, b| {
535            (&a.issue_id, a.ts_unix_ms, &a.actor, &a.event_id).cmp(&(
536                &b.issue_id,
537                b.ts_unix_ms,
538                &b.actor,
539                &b.event_id,
540            ))
541        });
542        Ok(events)
543    }
544
545    /// Rebuild all projections from events
546    pub fn rebuild(&self) -> Result<RebuildStats, GriteError> {
547        // Clear existing projections and indexes
548        self.issue_states.clear()?;
549        self.label_index.clear()?;
550        self.dep_forward.clear()?;
551        self.dep_reverse.clear()?;
552        self.context_files.clear()?;
553        self.context_symbols.clear()?;
554        self.context_project.clear()?;
555
556        // Collect all events
557        let mut events = self.get_all_events()?;
558
559        // Sort events by (issue_id, ts, actor, event_id) for deterministic ordering
560        events.sort_by(|a, b| {
561            (&a.issue_id, a.ts_unix_ms, &a.actor, &a.event_id).cmp(&(
562                &b.issue_id,
563                b.ts_unix_ms,
564                &b.actor,
565                &b.event_id,
566            ))
567        });
568
569        // Rebuild projections
570        for event in &events {
571            self.update_projection(event)?;
572        }
573
574        let issue_count = self.issue_states.len();
575
576        // Update rebuild timestamp and reset counter
577        let now = std::time::SystemTime::now()
578            .duration_since(std::time::UNIX_EPOCH)
579            .unwrap_or_default()
580            .as_millis() as u64;
581        self.metadata
582            .insert("last_rebuild_ts", &now.to_le_bytes())?;
583        self.metadata
584            .insert("events_since_rebuild", &0u64.to_le_bytes())?;
585
586        Ok(RebuildStats {
587            event_count: events.len(),
588            issue_count,
589        })
590    }
591
592    /// Rebuild all projections from provided events (for snapshot-based rebuild)
593    ///
594    /// This is useful when rebuilding from a snapshot + WAL combination,
595    /// where events come from external sources rather than the local store.
596    pub fn rebuild_from_events(&self, events: &[Event]) -> Result<RebuildStats, GriteError> {
597        // Clear existing projections, indexes, and events
598        self.issue_states.clear()?;
599        self.label_index.clear()?;
600        self.dep_forward.clear()?;
601        self.dep_reverse.clear()?;
602        self.context_files.clear()?;
603        self.context_symbols.clear()?;
604        self.context_project.clear()?;
605        self.events.clear()?;
606
607        // Sort events by (issue_id, ts, actor, event_id) for deterministic ordering
608        let mut sorted_events: Vec<_> = events.to_vec();
609        sorted_events.sort_by(|a, b| {
610            (&a.issue_id, a.ts_unix_ms, &a.actor, &a.event_id).cmp(&(
611                &b.issue_id,
612                b.ts_unix_ms,
613                &b.actor,
614                &b.event_id,
615            ))
616        });
617
618        // Insert events and rebuild projections
619        for event in &sorted_events {
620            // Insert event into store
621            let ev_key = event_key(&event.event_id);
622            let event_json = serde_json::to_vec(event)?;
623            self.events.insert(&ev_key, event_json)?;
624
625            // Index by issue
626            let ie_key = issue_events_key(&event.issue_id, event.ts_unix_ms, &event.event_id);
627            self.issue_events.insert(&ie_key, &[])?;
628
629            // Rebuild projection (handles deps, context, labels)
630            self.update_projection(event)?;
631        }
632
633        let issue_count = self.issue_states.len();
634
635        // Update rebuild timestamp and reset counter
636        let now = std::time::SystemTime::now()
637            .duration_since(std::time::UNIX_EPOCH)
638            .unwrap_or_default()
639            .as_millis() as u64;
640        self.metadata
641            .insert("last_rebuild_ts", &now.to_le_bytes())?;
642        self.metadata
643            .insert("events_since_rebuild", &0u64.to_le_bytes())?;
644
645        Ok(RebuildStats {
646            event_count: sorted_events.len(),
647            issue_count,
648        })
649    }
650
651    /// Get database statistics
652    pub fn stats(&self, path: &Path) -> Result<DbStats, GriteError> {
653        let event_count = self.events.len();
654        let issue_count = self.issue_states.len();
655
656        // Calculate size by walking the directory
657        let size_bytes = dir_size(path).unwrap_or(0);
658
659        let last_rebuild_ts = self.metadata.get("last_rebuild_ts")?.map(|bytes| {
660            let arr: [u8; 8] = bytes.as_ref().try_into().unwrap_or([0; 8]);
661            u64::from_le_bytes(arr)
662        });
663
664        let events_since_rebuild = self
665            .metadata
666            .get("events_since_rebuild")?
667            .map(|bytes| {
668                let arr: [u8; 8] = bytes.as_ref().try_into().unwrap_or([0; 8]);
669                u64::from_le_bytes(arr) as usize
670            })
671            .unwrap_or(event_count); // If never rebuilt, assume all events are since rebuild
672
673        // Calculate days since last rebuild
674        let now_ms = std::time::SystemTime::now()
675            .duration_since(std::time::UNIX_EPOCH)
676            .unwrap_or_default()
677            .as_millis() as u64;
678
679        let days_since_rebuild = last_rebuild_ts.map(|ts| {
680            let ms_diff = now_ms.saturating_sub(ts);
681            (ms_diff / (24 * 60 * 60 * 1000)) as u32
682        });
683
684        // Recommend rebuild if events > 10000 or days > 7
685        let rebuild_recommended = events_since_rebuild > DEFAULT_REBUILD_EVENTS_THRESHOLD
686            || days_since_rebuild
687                .map(|d| d > DEFAULT_REBUILD_DAYS_THRESHOLD)
688                .unwrap_or(false);
689
690        Ok(DbStats {
691            path: path.to_string_lossy().to_string(),
692            size_bytes,
693            event_count,
694            issue_count,
695            last_rebuild_ts,
696            events_since_rebuild,
697            days_since_rebuild,
698            rebuild_recommended,
699        })
700    }
701
702    // --- Dependency Query Methods ---
703
704    /// Get all outgoing dependencies for an issue
705    pub fn get_dependencies(
706        &self,
707        issue_id: &IssueId,
708    ) -> Result<Vec<(IssueId, DependencyType)>, GriteError> {
709        let prefix = dep_forward_prefix(issue_id);
710        let mut deps = Vec::new();
711
712        for result in self.dep_forward.scan_prefix(&prefix) {
713            let (key, _) = result?;
714            if let Some((target, dep_type)) = parse_dep_key_suffix(&key, prefix.len()) {
715                deps.push((target, dep_type));
716            }
717        }
718
719        Ok(deps)
720    }
721
722    /// Get all incoming dependencies (what depends on this issue)
723    pub fn get_dependents(
724        &self,
725        issue_id: &IssueId,
726    ) -> Result<Vec<(IssueId, DependencyType)>, GriteError> {
727        let prefix = dep_reverse_prefix(issue_id);
728        let mut deps = Vec::new();
729
730        for result in self.dep_reverse.scan_prefix(&prefix) {
731            let (key, _) = result?;
732            if let Some((source, dep_type)) = parse_dep_key_suffix(&key, prefix.len()) {
733                deps.push((source, dep_type));
734            }
735        }
736
737        Ok(deps)
738    }
739
740    /// Check if adding a dependency would create a cycle.
741    /// Only checks for Blocks/DependsOn (acyclic types).
742    pub fn would_create_cycle(
743        &self,
744        source: &IssueId,
745        target: &IssueId,
746        dep_type: &DependencyType,
747    ) -> Result<bool, GriteError> {
748        if !dep_type.is_acyclic() {
749            return Ok(false);
750        }
751
752        // DFS from target: can we reach source via forward deps?
753        let mut visited = HashSet::new();
754        let mut stack = vec![*target];
755
756        while let Some(current) = stack.pop() {
757            if current == *source {
758                return Ok(true);
759            }
760            if !visited.insert(current) {
761                continue;
762            }
763            for (dep_target, dt) in self.get_dependencies(&current)? {
764                if dt == *dep_type {
765                    stack.push(dep_target);
766                }
767            }
768        }
769
770        Ok(false)
771    }
772
773    /// Get issues in topological order based on dependency relationships.
774    /// Issues with no dependencies come first.
775    pub fn topological_order(&self, filter: &IssueFilter) -> Result<Vec<IssueSummary>, GriteError> {
776        let issues = self.list_issues(filter)?;
777        let issue_ids: HashSet<IssueId> = issues.iter().map(|i| i.issue_id).collect();
778
779        // Build in-degree map (only count edges within the filtered set)
780        let mut in_degree: std::collections::HashMap<IssueId, usize> =
781            std::collections::HashMap::new();
782        let mut adj: std::collections::HashMap<IssueId, Vec<IssueId>> =
783            std::collections::HashMap::new();
784
785        for issue in &issues {
786            in_degree.entry(issue.issue_id).or_insert(0);
787            adj.entry(issue.issue_id).or_default();
788
789            for (target, dep_type) in self.get_dependencies(&issue.issue_id)? {
790                if dep_type.is_acyclic() && issue_ids.contains(&target) {
791                    // issue depends on target, so target must come first
792                    adj.entry(target).or_default().push(issue.issue_id);
793                    *in_degree.entry(issue.issue_id).or_insert(0) += 1;
794                }
795            }
796        }
797
798        // Kahn's algorithm
799        let mut queue: std::collections::VecDeque<IssueId> = in_degree
800            .iter()
801            .filter(|(_, &deg)| deg == 0)
802            .map(|(&id, _)| id)
803            .collect();
804
805        let mut sorted_ids = Vec::new();
806        while let Some(id) = queue.pop_front() {
807            sorted_ids.push(id);
808            if let Some(neighbors) = adj.get(&id) {
809                for &neighbor in neighbors {
810                    if let Some(deg) = in_degree.get_mut(&neighbor) {
811                        *deg -= 1;
812                        if *deg == 0 {
813                            queue.push_back(neighbor);
814                        }
815                    }
816                }
817            }
818        }
819
820        // Any remaining issues (cycles) go at the end
821        for issue in &issues {
822            if !sorted_ids.contains(&issue.issue_id) {
823                sorted_ids.push(issue.issue_id);
824            }
825        }
826
827        // Map back to summaries in sorted order
828        let issue_map: std::collections::HashMap<IssueId, &IssueSummary> =
829            issues.iter().map(|i| (i.issue_id, i)).collect();
830        let result = sorted_ids
831            .iter()
832            .filter_map(|id| issue_map.get(id).map(|s| (*s).clone()))
833            .collect();
834
835        Ok(result)
836    }
837
838    // --- Context Query Methods ---
839
840    /// Get file context for a specific path
841    pub fn get_file_context(&self, path: &str) -> Result<Option<FileContext>, GriteError> {
842        let key = context_file_key(path);
843        match self.context_files.get(&key)? {
844            Some(bytes) => Ok(Some(serde_json::from_slice(&bytes)?)),
845            None => Ok(None),
846        }
847    }
848
849    /// Query symbols by name prefix
850    pub fn query_symbols(&self, query: &str) -> Result<Vec<(String, String)>, GriteError> {
851        let prefix = context_symbol_prefix(query);
852        let mut results = Vec::new();
853
854        for result in self.context_symbols.scan_prefix(&prefix) {
855            let (key, _) = result?;
856            if let Ok(key_str) = std::str::from_utf8(&key) {
857                // Key format: "ctx/sym/<name>/<path>"
858                if let Some(rest) = key_str.strip_prefix("ctx/sym/") {
859                    if let Some(slash_pos) = rest.find('/') {
860                        let name = rest[..slash_pos].to_string();
861                        let path = rest[slash_pos + 1..].to_string();
862                        results.push((name, path));
863                    }
864                }
865            }
866        }
867
868        Ok(results)
869    }
870
871    /// List all indexed file paths
872    pub fn list_context_files(&self) -> Result<Vec<String>, GriteError> {
873        let mut paths = Vec::new();
874        for result in self.context_files.iter() {
875            let (key, _) = result?;
876            if let Ok(key_str) = std::str::from_utf8(&key) {
877                if let Some(path) = key_str.strip_prefix("ctx/file/") {
878                    paths.push(path.to_string());
879                }
880            }
881        }
882        Ok(paths)
883    }
884
885    /// Get a project context entry by key
886    pub fn get_project_context(
887        &self,
888        key: &str,
889    ) -> Result<Option<ProjectContextEntry>, GriteError> {
890        let k = context_project_key(key);
891        match self.context_project.get(&k)? {
892            Some(bytes) => Ok(Some(serde_json::from_slice(&bytes)?)),
893            None => Ok(None),
894        }
895    }
896
897    /// List all project context entries
898    pub fn list_project_context(&self) -> Result<Vec<(String, ProjectContextEntry)>, GriteError> {
899        let mut entries = Vec::new();
900        for result in self.context_project.iter() {
901            let (key, value) = result?;
902            if let Ok(key_str) = std::str::from_utf8(&key) {
903                if let Some(k) = key_str.strip_prefix("ctx/proj/") {
904                    let entry: ProjectContextEntry = serde_json::from_slice(&value)?;
905                    entries.push((k.to_string(), entry));
906                }
907            }
908        }
909        Ok(entries)
910    }
911
912    /// Flush pending writes to disk
913    pub fn flush(&self) -> Result<(), GriteError> {
914        self.db.flush()?;
915        Ok(())
916    }
917}
918
919// Key construction helpers
920
921fn event_key(event_id: &EventId) -> Vec<u8> {
922    let mut key = Vec::with_capacity(6 + 32);
923    key.extend_from_slice(b"event/");
924    key.extend_from_slice(event_id);
925    key
926}
927
928fn issue_state_key(issue_id: &IssueId) -> Vec<u8> {
929    let mut key = Vec::with_capacity(12 + 16);
930    key.extend_from_slice(b"issue_state/");
931    key.extend_from_slice(issue_id);
932    key
933}
934
935fn issue_events_prefix(issue_id: &IssueId) -> Vec<u8> {
936    let mut key = Vec::with_capacity(13 + 16);
937    key.extend_from_slice(b"issue_events/");
938    key.extend_from_slice(issue_id);
939    key.push(b'/');
940    key
941}
942
943fn issue_events_key(issue_id: &IssueId, ts: u64, event_id: &EventId) -> Vec<u8> {
944    let mut key = issue_events_prefix(issue_id);
945    key.extend_from_slice(&ts.to_be_bytes());
946    key.push(b'/');
947    key.extend_from_slice(event_id);
948    key
949}
950
951fn label_index_key(label: &str, issue_id: &IssueId) -> Vec<u8> {
952    let mut key = Vec::with_capacity(12 + label.len() + 1 + 16);
953    key.extend_from_slice(b"label_index/");
954    key.extend_from_slice(label.as_bytes());
955    key.push(b'/');
956    key.extend_from_slice(issue_id);
957    key
958}
959
960// Dependency key helpers
961
962fn dep_type_to_byte(dep_type: &DependencyType) -> u8 {
963    match dep_type {
964        DependencyType::Blocks => b'B',
965        DependencyType::DependsOn => b'D',
966        DependencyType::RelatedTo => b'R',
967    }
968}
969
970fn byte_to_dep_type(b: u8) -> Option<DependencyType> {
971    match b {
972        b'B' => Some(DependencyType::Blocks),
973        b'D' => Some(DependencyType::DependsOn),
974        b'R' => Some(DependencyType::RelatedTo),
975        _ => None,
976    }
977}
978
979fn dep_forward_prefix(source: &IssueId) -> Vec<u8> {
980    let mut key = Vec::with_capacity(8 + 16 + 1);
981    key.extend_from_slice(b"dep_fwd/");
982    key.extend_from_slice(source);
983    key.push(b'/');
984    key
985}
986
987fn dep_forward_key(source: &IssueId, target: &IssueId, dep_type: &DependencyType) -> Vec<u8> {
988    let mut key = dep_forward_prefix(source);
989    key.extend_from_slice(target);
990    key.push(b'/');
991    key.push(dep_type_to_byte(dep_type));
992    key
993}
994
995fn dep_reverse_prefix(target: &IssueId) -> Vec<u8> {
996    let mut key = Vec::with_capacity(8 + 16 + 1);
997    key.extend_from_slice(b"dep_rev/");
998    key.extend_from_slice(target);
999    key.push(b'/');
1000    key
1001}
1002
1003fn dep_reverse_key(target: &IssueId, source: &IssueId, dep_type: &DependencyType) -> Vec<u8> {
1004    let mut key = dep_reverse_prefix(target);
1005    key.extend_from_slice(source);
1006    key.push(b'/');
1007    key.push(dep_type_to_byte(dep_type));
1008    key
1009}
1010
1011/// Parse the suffix of a dep key (after the prefix) to extract target/source and dep_type
1012fn parse_dep_key_suffix(key: &[u8], prefix_len: usize) -> Option<(IssueId, DependencyType)> {
1013    // Suffix format: <issue_id 16 bytes> / <dep_type 1 byte>
1014    let suffix = &key[prefix_len..];
1015    if suffix.len() != 16 + 1 + 1 {
1016        return None;
1017    }
1018    let mut issue_id = [0u8; 16];
1019    issue_id.copy_from_slice(&suffix[..16]);
1020    // suffix[16] is '/'
1021    let dep_type = byte_to_dep_type(suffix[17])?;
1022    Some((issue_id, dep_type))
1023}
1024
1025// Context key helpers
1026
1027fn context_file_key(path: &str) -> Vec<u8> {
1028    let mut key = Vec::new();
1029    key.extend_from_slice(b"ctx/file/");
1030    key.extend_from_slice(path.as_bytes());
1031    key
1032}
1033
1034fn context_symbol_prefix(name: &str) -> Vec<u8> {
1035    let mut key = Vec::new();
1036    key.extend_from_slice(b"ctx/sym/");
1037    key.extend_from_slice(name.as_bytes());
1038    key
1039}
1040
1041fn context_symbol_key(name: &str, path: &str) -> Vec<u8> {
1042    let mut key = context_symbol_prefix(name);
1043    key.push(b'/');
1044    key.extend_from_slice(path.as_bytes());
1045    key
1046}
1047
1048fn context_project_key(key_name: &str) -> Vec<u8> {
1049    let mut key = Vec::new();
1050    key.extend_from_slice(b"ctx/proj/");
1051    key.extend_from_slice(key_name.as_bytes());
1052    key
1053}
1054
1055fn extract_event_id_from_issue_events_key(key: &[u8]) -> Result<EventId, GriteError> {
1056    // Key format: "issue_events/" + issue_id (16) + "/" + ts (8) + "/" + event_id (32)
1057    // Total: 13 + 16 + 1 + 8 + 1 + 32 = 71
1058    if key.len() < 71 {
1059        return Err(GriteError::Internal("Invalid issue_events key".to_string()));
1060    }
1061    let event_id_start = key.len() - 32;
1062    let mut event_id = [0u8; 32];
1063    event_id.copy_from_slice(&key[event_id_start..]);
1064    Ok(event_id)
1065}
1066
1067fn dir_size(path: &Path) -> std::io::Result<u64> {
1068    let mut size = 0;
1069    if path.is_dir() {
1070        for entry in std::fs::read_dir(path)? {
1071            let entry = entry?;
1072            let meta = entry.metadata()?;
1073            if meta.is_dir() {
1074                size += dir_size(&entry.path())?;
1075            } else {
1076                size += meta.len();
1077            }
1078        }
1079    }
1080    Ok(size)
1081}
1082
1083#[cfg(test)]
1084mod tests {
1085    use super::*;
1086    use crate::hash::compute_event_id;
1087    use crate::types::event::EventKind;
1088    use crate::types::ids::generate_issue_id;
1089    use tempfile::tempdir;
1090
1091    fn make_event(issue_id: IssueId, actor: [u8; 16], ts: u64, kind: EventKind) -> Event {
1092        let event_id = compute_event_id(&issue_id, &actor, ts, None, &kind);
1093        Event::new(event_id, issue_id, actor, ts, None, kind)
1094    }
1095
1096    #[test]
1097    fn test_store_basic_operations() {
1098        let dir = tempdir().unwrap();
1099        let store = GriteStore::open(dir.path()).unwrap();
1100
1101        let issue_id = generate_issue_id();
1102        let actor = [1u8; 16];
1103
1104        // Create an issue
1105        let create_event = make_event(
1106            issue_id,
1107            actor,
1108            1000,
1109            EventKind::IssueCreated {
1110                title: "Test Issue".to_string(),
1111                body: "Test body".to_string(),
1112                labels: vec!["bug".to_string()],
1113            },
1114        );
1115
1116        store.insert_event(&create_event).unwrap();
1117
1118        // Verify event was stored
1119        let retrieved = store.get_event(&create_event.event_id).unwrap().unwrap();
1120        assert_eq!(retrieved.event_id, create_event.event_id);
1121
1122        // Verify projection was created
1123        let proj = store.get_issue(&issue_id).unwrap().unwrap();
1124        assert_eq!(proj.title, "Test Issue");
1125        assert!(proj.labels.contains("bug"));
1126    }
1127
1128    #[test]
1129    fn test_store_list_issues() {
1130        let dir = tempdir().unwrap();
1131        let store = GriteStore::open(dir.path()).unwrap();
1132
1133        let actor = [1u8; 16];
1134
1135        // Create two issues
1136        for i in 0..2 {
1137            let issue_id = generate_issue_id();
1138            let event = make_event(
1139                issue_id,
1140                actor,
1141                1000 + i,
1142                EventKind::IssueCreated {
1143                    title: format!("Issue {}", i),
1144                    body: "Body".to_string(),
1145                    labels: vec![],
1146                },
1147            );
1148            store.insert_event(&event).unwrap();
1149        }
1150
1151        let issues = store.list_issues(&IssueFilter::default()).unwrap();
1152        assert_eq!(issues.len(), 2);
1153    }
1154
1155    #[test]
1156    fn test_store_rebuild() {
1157        let dir = tempdir().unwrap();
1158        let store = GriteStore::open(dir.path()).unwrap();
1159
1160        let issue_id = generate_issue_id();
1161        let actor = [1u8; 16];
1162
1163        // Create and modify an issue
1164        let events = vec![
1165            make_event(
1166                issue_id,
1167                actor,
1168                1000,
1169                EventKind::IssueCreated {
1170                    title: "Test".to_string(),
1171                    body: "Body".to_string(),
1172                    labels: vec![],
1173                },
1174            ),
1175            make_event(
1176                issue_id,
1177                actor,
1178                2000,
1179                EventKind::IssueUpdated {
1180                    title: Some("Updated".to_string()),
1181                    body: None,
1182                },
1183            ),
1184        ];
1185
1186        for event in &events {
1187            store.insert_event(event).unwrap();
1188        }
1189
1190        // Get projection before rebuild
1191        let proj_before = store.get_issue(&issue_id).unwrap().unwrap();
1192        assert_eq!(proj_before.title, "Updated");
1193
1194        // Rebuild
1195        let stats = store.rebuild().unwrap();
1196        assert_eq!(stats.event_count, 2);
1197        assert_eq!(stats.issue_count, 1);
1198
1199        // Verify projection is the same after rebuild
1200        let proj_after = store.get_issue(&issue_id).unwrap().unwrap();
1201        assert_eq!(proj_after.title, "Updated");
1202    }
1203
1204    #[test]
1205    fn test_locked_store_creates_lock_file() {
1206        let dir = tempdir().unwrap();
1207        let store_path = dir.path().join("sled");
1208        let lock_path = dir.path().join("sled.lock");
1209
1210        // Lock file shouldn't exist yet
1211        assert!(!lock_path.exists());
1212
1213        // Open locked store
1214        let _store = GriteStore::open_locked(&store_path).unwrap();
1215
1216        // Lock file should now exist
1217        assert!(lock_path.exists());
1218    }
1219
1220    #[test]
1221    fn test_locked_store_second_open_fails() {
1222        let dir = tempdir().unwrap();
1223        let store_path = dir.path().join("sled");
1224
1225        // First open succeeds
1226        let _store1 = GriteStore::open_locked(&store_path).unwrap();
1227
1228        // Second open should fail with DbBusy
1229        let result = GriteStore::open_locked(&store_path);
1230        assert!(result.is_err());
1231        match result.unwrap_err() {
1232            GriteError::DbBusy(msg) => {
1233                assert!(msg.contains("locked"));
1234            }
1235            other => panic!("Expected DbBusy error, got {:?}", other),
1236        }
1237    }
1238
1239    #[test]
1240    fn test_locked_store_released_on_drop() {
1241        let dir = tempdir().unwrap();
1242        let store_path = dir.path().join("sled");
1243
1244        // First open
1245        {
1246            let _store = GriteStore::open_locked(&store_path).unwrap();
1247            // Store is dropped here
1248        }
1249
1250        // Second open should succeed after drop
1251        let _store2 = GriteStore::open_locked(&store_path).unwrap();
1252    }
1253
1254    #[test]
1255    fn test_locked_store_blocking_timeout() {
1256        let dir = tempdir().unwrap();
1257        let store_path = dir.path().join("sled");
1258
1259        // First open succeeds
1260        let _store1 = GriteStore::open_locked(&store_path).unwrap();
1261
1262        // Blocking open with very short timeout should fail
1263        let result = GriteStore::open_locked_blocking(&store_path, Duration::from_millis(50));
1264        assert!(result.is_err());
1265        match result.unwrap_err() {
1266            GriteError::DbBusy(msg) => {
1267                assert!(msg.contains("Timeout"));
1268            }
1269            other => panic!("Expected DbBusy timeout error, got {:?}", other),
1270        }
1271    }
1272
1273    #[test]
1274    fn test_locked_store_deref_access() {
1275        let dir = tempdir().unwrap();
1276        let store_path = dir.path().join("sled");
1277
1278        let store = GriteStore::open_locked(&store_path).unwrap();
1279
1280        // Verify we can access GriteStore methods through Deref
1281        let issue_id = generate_issue_id();
1282        let actor = [1u8; 16];
1283        let event = make_event(
1284            issue_id,
1285            actor,
1286            1000,
1287            EventKind::IssueCreated {
1288                title: "Test".to_string(),
1289                body: "Body".to_string(),
1290                labels: vec![],
1291            },
1292        );
1293
1294        // These calls go through Deref to GriteStore
1295        store.insert_event(&event).unwrap();
1296        let retrieved = store.get_event(&event.event_id).unwrap();
1297        assert!(retrieved.is_some());
1298    }
1299}