Skip to main content

orbok_db/repo/
sources.rs

1//! Source repository (RFC-002 §7.2, RFC-003).
2
3use crate::catalog::{Catalog, db_err};
4use orbok_core::{
5    HiddenFilePolicy, IndexMode, OrbokError, OrbokResult, PersistenceMode, SourceId, SourceStatus,
6    SourceType, SymlinkPolicy, now_iso8601,
7};
8use rusqlite::{Row, params};
9
10/// A registered source (persistent catalog data — never deleted by
11/// ordinary cleanup, RFC-001 §7.1).
12#[derive(Debug, Clone)]
13pub struct SourceRecord {
14    pub source_id: SourceId,
15    pub source_type: SourceType,
16    pub persistence_mode: PersistenceMode,
17    pub display_name: Option<String>,
18    pub original_path: String,
19    pub canonical_path: String,
20    pub status: SourceStatus,
21    pub index_mode: IndexMode,
22    pub include_patterns: Vec<String>,
23    pub exclude_patterns: Vec<String>,
24    pub hidden_file_policy: HiddenFilePolicy,
25    pub symlink_policy: SymlinkPolicy,
26    pub max_file_size_bytes: Option<u64>,
27    pub created_at: String,
28    pub updated_at: String,
29    pub last_scanned_at: Option<String>,
30}
31
32/// Parameters for registering a new source (RFC-003 §9.1).
33#[derive(Debug, Clone)]
34pub struct NewSource {
35    pub source_type: SourceType,
36    pub persistence_mode: PersistenceMode,
37    pub display_name: Option<String>,
38    pub original_path: String,
39    pub canonical_path: String,
40    pub index_mode: IndexMode,
41    pub include_patterns: Vec<String>,
42    pub exclude_patterns: Vec<String>,
43    pub hidden_file_policy: HiddenFilePolicy,
44    pub symlink_policy: SymlinkPolicy,
45    pub max_file_size_bytes: Option<u64>,
46}
47
48/// Repository over the `sources` table.
49pub struct SourceRepository<'a> {
50    catalog: &'a Catalog,
51}
52
53const COLUMNS: &str = "source_id, source_type, persistence_mode, display_name, original_path, \
54     canonical_path, status, index_mode, include_patterns_json, exclude_patterns_json, \
55     hidden_file_policy, symlink_policy, max_file_size_bytes, created_at, updated_at, \
56     last_scanned_at";
57
58impl<'a> SourceRepository<'a> {
59    pub fn new(catalog: &'a Catalog) -> Self {
60        Self { catalog }
61    }
62
63    /// Register a source as Active. The transaction requirement of
64    /// RFC-002 §9 item 1 is satisfied by the single-statement insert.
65    pub fn insert(&self, new: NewSource) -> OrbokResult<SourceRecord> {
66        let id = SourceId::generate();
67        let now = now_iso8601();
68        let conn = self.catalog.lock();
69        conn.execute(
70            "INSERT INTO sources (source_id, source_type, persistence_mode, display_name, \
71             original_path, canonical_path, status, index_mode, include_patterns_json, \
72             exclude_patterns_json, hidden_file_policy, symlink_policy, max_file_size_bytes, \
73             created_at, updated_at) \
74             VALUES (?1,?2,?3,?4,?5,?6,?7,?8,?9,?10,?11,?12,?13,?14,?14)",
75            params![
76                id.as_str(),
77                new.source_type.as_str(),
78                new.persistence_mode.as_str(),
79                new.display_name,
80                new.original_path,
81                new.canonical_path,
82                SourceStatus::Active.as_str(),
83                new.index_mode.as_str(),
84                serde_json::to_string(&new.include_patterns).unwrap_or_default(),
85                serde_json::to_string(&new.exclude_patterns).unwrap_or_default(),
86                new.hidden_file_policy.as_str(),
87                new.symlink_policy.as_str(),
88                new.max_file_size_bytes.map(|v| v as i64),
89                now,
90            ],
91        )
92        .map_err(db_err)?;
93        drop(conn);
94        self.get(&id)?.ok_or(OrbokError::SourceNotFound)
95    }
96
97    /// Fetch one source by id.
98    pub fn get(&self, id: &SourceId) -> OrbokResult<Option<SourceRecord>> {
99        let conn = self.catalog.lock();
100        let mut stmt = conn
101            .prepare(&format!(
102                "SELECT {COLUMNS} FROM sources WHERE source_id = ?1"
103            ))
104            .map_err(db_err)?;
105        let mut rows = stmt
106            .query_map(params![id.as_str()], row_to_record)
107            .map_err(db_err)?;
108        match rows.next() {
109            Some(r) => Ok(Some(r.map_err(db_err)??)),
110            None => Ok(None),
111        }
112    }
113
114    /// All sources except Removed, newest first.
115    pub fn list(&self) -> OrbokResult<Vec<SourceRecord>> {
116        self.query_records(&format!(
117            "SELECT {COLUMNS} FROM sources WHERE status != 'removed' ORDER BY created_at DESC"
118        ))
119    }
120
121    /// Sources eligible for scanning (Active only, RFC-004 §10).
122    pub fn list_active(&self) -> OrbokResult<Vec<SourceRecord>> {
123        self.query_records(&format!(
124            "SELECT {COLUMNS} FROM sources WHERE status = 'active' ORDER BY created_at"
125        ))
126    }
127
128    fn query_records(&self, sql: &str) -> OrbokResult<Vec<SourceRecord>> {
129        let conn = self.catalog.lock();
130        let mut stmt = conn.prepare(sql).map_err(db_err)?;
131        let rows = stmt.query_map([], row_to_record).map_err(db_err)?;
132        let mut out = Vec::new();
133        for row in rows {
134            out.push(row.map_err(db_err)??);
135        }
136        Ok(out)
137    }
138
139    /// Update status (pause/resume/missing/permission_denied/removed).
140    pub fn set_status(&self, id: &SourceId, status: SourceStatus) -> OrbokResult<()> {
141        let conn = self.catalog.lock();
142        let n = conn
143            .execute(
144                "UPDATE sources SET status = ?2, updated_at = ?3 WHERE source_id = ?1",
145                params![id.as_str(), status.as_str(), now_iso8601()],
146            )
147            .map_err(db_err)?;
148        if n == 0 {
149            return Err(OrbokError::SourceNotFound);
150        }
151        Ok(())
152    }
153
154    /// Record a completed scan.
155    pub fn touch_scanned(&self, id: &SourceId) -> OrbokResult<()> {
156        let now = now_iso8601();
157        let conn = self.catalog.lock();
158        conn.execute(
159            "UPDATE sources SET last_scanned_at = ?2, updated_at = ?2 WHERE source_id = ?1",
160            params![id.as_str(), now],
161        )
162        .map_err(db_err)?;
163        Ok(())
164    }
165
166    /// Remove-source option 3 (RFC-003 §10.3): delete the source row and
167    /// let foreign keys cascade through files → extraction → chunks →
168    /// indexes. Source files on disk are never touched.
169    pub fn delete_with_all_data(&self, id: &SourceId) -> OrbokResult<()> {
170        let conn = self.catalog.lock();
171        conn.execute(
172            "DELETE FROM sources WHERE source_id = ?1",
173            params![id.as_str()],
174        )
175        .map_err(db_err)?;
176        Ok(())
177    }
178}
179
180fn row_to_record(row: &Row<'_>) -> rusqlite::Result<OrbokResult<SourceRecord>> {
181    let parse_patterns = |s: Option<String>| -> Vec<String> {
182        s.and_then(|s| serde_json::from_str(&s).ok())
183            .unwrap_or_default()
184    };
185    let source_type: String = row.get(1)?;
186    let persistence: String = row.get(2)?;
187    let status: String = row.get(6)?;
188    let index_mode: String = row.get(7)?;
189    let hidden: String = row.get(10)?;
190    let symlink: String = row.get(11)?;
191    let max_size: Option<i64> = row.get(12)?;
192
193    Ok((|| {
194        Ok(SourceRecord {
195            source_id: SourceId::from_string(row.get::<_, String>(0).map_err(db_err)?),
196            source_type: SourceType::parse(&source_type)?,
197            persistence_mode: PersistenceMode::parse(&persistence)?,
198            display_name: row.get(3).map_err(db_err)?,
199            original_path: row.get(4).map_err(db_err)?,
200            canonical_path: row.get(5).map_err(db_err)?,
201            status: SourceStatus::parse(&status)?,
202            index_mode: IndexMode::parse(&index_mode)?,
203            include_patterns: parse_patterns(row.get(8).map_err(db_err)?),
204            exclude_patterns: parse_patterns(row.get(9).map_err(db_err)?),
205            hidden_file_policy: HiddenFilePolicy::parse(&hidden)?,
206            symlink_policy: SymlinkPolicy::parse(&symlink)?,
207            max_file_size_bytes: max_size.map(|v| v as u64),
208            created_at: row.get(13).map_err(db_err)?,
209            updated_at: row.get(14).map_err(db_err)?,
210            last_scanned_at: row.get(15).map_err(db_err)?,
211        })
212    })())
213}