Skip to main content

orbok_db/repo/
sources.rs

1//! Source repository (RFC-002 §7.2, RFC-003).
2
3use crate::catalog::{Catalog, db_err};
4use orbok_core::{
5    HiddenFilePolicy, IndexMode, OrbokError, OrbokResult, PersistenceMode, SourceId, SourceStatus,
6    SourceType, SymlinkPolicy, now_iso8601,
7};
8use rusqlite::{Row, params};
9
10/// A registered source (persistent catalog data — never deleted by
11/// ordinary cleanup, RFC-001 §7.1).
12#[derive(Debug, Clone)]
13pub struct SourceRecord {
14    pub source_id: SourceId,
15    pub source_type: SourceType,
16    pub persistence_mode: PersistenceMode,
17    pub display_name: Option<String>,
18    pub original_path: String,
19    pub canonical_path: String,
20    pub status: SourceStatus,
21    pub index_mode: IndexMode,
22    pub include_patterns: Vec<String>,
23    pub exclude_patterns: Vec<String>,
24    pub hidden_file_policy: HiddenFilePolicy,
25    pub symlink_policy: SymlinkPolicy,
26    pub max_file_size_bytes: Option<u64>,
27    pub created_at: String,
28    pub updated_at: String,
29    pub last_scanned_at: Option<String>,
30}
31
32/// Parameters for registering a new source (RFC-003 §9.1).
33#[derive(Debug, Clone)]
34pub struct NewSource {
35    pub source_type: SourceType,
36    pub persistence_mode: PersistenceMode,
37    pub display_name: Option<String>,
38    pub original_path: String,
39    pub canonical_path: String,
40    pub index_mode: IndexMode,
41    pub include_patterns: Vec<String>,
42    pub exclude_patterns: Vec<String>,
43    pub hidden_file_policy: HiddenFilePolicy,
44    pub symlink_policy: SymlinkPolicy,
45    pub max_file_size_bytes: Option<u64>,
46}
47
48/// Repository over the `sources` table.
49pub struct SourceRepository<'a> {
50    catalog: &'a Catalog,
51}
52
53const COLUMNS: &str = "source_id, source_type, persistence_mode, display_name, original_path, \
54     canonical_path, status, index_mode, include_patterns_json, exclude_patterns_json, \
55     hidden_file_policy, symlink_policy, max_file_size_bytes, created_at, updated_at, \
56     last_scanned_at";
57
58impl<'a> SourceRepository<'a> {
59    pub fn new(catalog: &'a Catalog) -> Self {
60        Self { catalog }
61    }
62
63    /// Register a source as Active. The transaction requirement of
64    /// RFC-002 §9 item 1 is satisfied by the single-statement insert.
65    pub fn insert(&self, new: NewSource) -> OrbokResult<SourceRecord> {
66        let id = SourceId::generate();
67        let now = now_iso8601();
68        let conn = self.catalog.lock();
69        conn.execute(
70            "INSERT INTO sources (source_id, source_type, persistence_mode, display_name, \
71             original_path, canonical_path, status, index_mode, include_patterns_json, \
72             exclude_patterns_json, hidden_file_policy, symlink_policy, max_file_size_bytes, \
73             created_at, updated_at) \
74             VALUES (?1,?2,?3,?4,?5,?6,?7,?8,?9,?10,?11,?12,?13,?14,?14)",
75            params![
76                id.as_str(),
77                new.source_type.as_str(),
78                new.persistence_mode.as_str(),
79                new.display_name,
80                new.original_path,
81                new.canonical_path,
82                SourceStatus::Active.as_str(),
83                new.index_mode.as_str(),
84                serde_json::to_string(&new.include_patterns).unwrap_or_default(),
85                serde_json::to_string(&new.exclude_patterns).unwrap_or_default(),
86                new.hidden_file_policy.as_str(),
87                new.symlink_policy.as_str(),
88                new.max_file_size_bytes.map(|v| v as i64),
89                now,
90            ],
91        )
92        .map_err(db_err)?;
93        drop(conn);
94        self.get(&id)?.ok_or(OrbokError::SourceNotFound)
95    }
96
97    /// Fetch one source by id.
98    pub fn get(&self, id: &SourceId) -> OrbokResult<Option<SourceRecord>> {
99        let conn = self.catalog.lock();
100        let mut stmt = conn
101            .prepare(&format!("SELECT {COLUMNS} FROM sources WHERE source_id = ?1"))
102            .map_err(db_err)?;
103        let mut rows = stmt
104            .query_map(params![id.as_str()], row_to_record)
105            .map_err(db_err)?;
106        match rows.next() {
107            Some(r) => Ok(Some(r.map_err(db_err)??)),
108            None => Ok(None),
109        }
110    }
111
112    /// All sources except Removed, newest first.
113    pub fn list(&self) -> OrbokResult<Vec<SourceRecord>> {
114        self.query_records(&format!(
115            "SELECT {COLUMNS} FROM sources WHERE status != 'removed' ORDER BY created_at DESC"
116        ))
117    }
118
119    /// Sources eligible for scanning (Active only, RFC-004 §10).
120    pub fn list_active(&self) -> OrbokResult<Vec<SourceRecord>> {
121        self.query_records(&format!(
122            "SELECT {COLUMNS} FROM sources WHERE status = 'active' ORDER BY created_at"
123        ))
124    }
125
126    fn query_records(&self, sql: &str) -> OrbokResult<Vec<SourceRecord>> {
127        let conn = self.catalog.lock();
128        let mut stmt = conn.prepare(sql).map_err(db_err)?;
129        let rows = stmt.query_map([], row_to_record).map_err(db_err)?;
130        let mut out = Vec::new();
131        for row in rows {
132            out.push(row.map_err(db_err)??);
133        }
134        Ok(out)
135    }
136
137    /// Update status (pause/resume/missing/permission_denied/removed).
138    pub fn set_status(&self, id: &SourceId, status: SourceStatus) -> OrbokResult<()> {
139        let conn = self.catalog.lock();
140        let n = conn
141            .execute(
142                "UPDATE sources SET status = ?2, updated_at = ?3 WHERE source_id = ?1",
143                params![id.as_str(), status.as_str(), now_iso8601()],
144            )
145            .map_err(db_err)?;
146        if n == 0 {
147            return Err(OrbokError::SourceNotFound);
148        }
149        Ok(())
150    }
151
152    /// Record a completed scan.
153    pub fn touch_scanned(&self, id: &SourceId) -> OrbokResult<()> {
154        let now = now_iso8601();
155        let conn = self.catalog.lock();
156        conn.execute(
157            "UPDATE sources SET last_scanned_at = ?2, updated_at = ?2 WHERE source_id = ?1",
158            params![id.as_str(), now],
159        )
160        .map_err(db_err)?;
161        Ok(())
162    }
163
164    /// Remove-source option 3 (RFC-003 §10.3): delete the source row and
165    /// let foreign keys cascade through files → extraction → chunks →
166    /// indexes. Source files on disk are never touched.
167    pub fn delete_with_all_data(&self, id: &SourceId) -> OrbokResult<()> {
168        let conn = self.catalog.lock();
169        conn.execute("DELETE FROM sources WHERE source_id = ?1", params![id.as_str()])
170            .map_err(db_err)?;
171        Ok(())
172    }
173}
174
175fn row_to_record(row: &Row<'_>) -> rusqlite::Result<OrbokResult<SourceRecord>> {
176    let parse_patterns = |s: Option<String>| -> Vec<String> {
177        s.and_then(|s| serde_json::from_str(&s).ok()).unwrap_or_default()
178    };
179    let source_type: String = row.get(1)?;
180    let persistence: String = row.get(2)?;
181    let status: String = row.get(6)?;
182    let index_mode: String = row.get(7)?;
183    let hidden: String = row.get(10)?;
184    let symlink: String = row.get(11)?;
185    let max_size: Option<i64> = row.get(12)?;
186
187    Ok((|| {
188        Ok(SourceRecord {
189            source_id: SourceId::from_string(row.get::<_, String>(0).map_err(db_err)?),
190            source_type: SourceType::parse(&source_type)?,
191            persistence_mode: PersistenceMode::parse(&persistence)?,
192            display_name: row.get(3).map_err(db_err)?,
193            original_path: row.get(4).map_err(db_err)?,
194            canonical_path: row.get(5).map_err(db_err)?,
195            status: SourceStatus::parse(&status)?,
196            index_mode: IndexMode::parse(&index_mode)?,
197            include_patterns: parse_patterns(row.get(8).map_err(db_err)?),
198            exclude_patterns: parse_patterns(row.get(9).map_err(db_err)?),
199            hidden_file_policy: HiddenFilePolicy::parse(&hidden)?,
200            symlink_policy: SymlinkPolicy::parse(&symlink)?,
201            max_file_size_bytes: max_size.map(|v| v as u64),
202            created_at: row.get(13).map_err(db_err)?,
203            updated_at: row.get(14).map_err(db_err)?,
204            last_scanned_at: row.get(15).map_err(db_err)?,
205        })
206    })())
207}