Skip to main content

toolhub_storage/
sources.rs

1//! `sources` table accessors.
2//!
3//! Phase 3 added the bare `upsert` so the MCP `add_source` stub could record
4//! a row. Phase 5 added `upsert_full` (writes `last_commit_sha`), plus `list`,
5//! `get`, and `delete` so `toolhub update`/`remove` can drive the re-pull
6//! lifecycle.
7
8use chrono::{DateTime, Utc};
9use rusqlite::{Connection, params};
10
11#[derive(Debug, Clone, PartialEq, Eq)]
12pub struct SourceRow {
13    pub id: String,
14    pub r#type: String,
15    pub location: String,
16    pub last_pulled_at: Option<DateTime<Utc>>,
17    pub last_commit_sha: Option<String>,
18}
19
20pub fn upsert(conn: &Connection, id: &str, type_: &str, location: &str) -> anyhow::Result<()> {
21    let now = Utc::now().to_rfc3339();
22    conn.execute(
23        "INSERT INTO sources (id, type, location, last_pulled_at, last_commit_sha)
24         VALUES (?, ?, ?, ?, NULL)
25         ON CONFLICT(id) DO UPDATE SET
26             type = excluded.type,
27             location = excluded.location,
28             last_pulled_at = excluded.last_pulled_at",
29        params![id, type_, location, now],
30    )?;
31    Ok(())
32}
33
34/// Insert or update a source with full provenance — the timestamp the user
35/// pulled it and the upstream commit sha. `update` uses this on every re-pull;
36/// `add` uses it on the first ingestion.
37pub fn upsert_full(
38    conn: &Connection,
39    id: &str,
40    type_: &str,
41    location: &str,
42    last_pulled_at: DateTime<Utc>,
43    last_commit_sha: Option<&str>,
44) -> anyhow::Result<()> {
45    conn.execute(
46        "INSERT INTO sources (id, type, location, last_pulled_at, last_commit_sha)
47         VALUES (?, ?, ?, ?, ?)
48         ON CONFLICT(id) DO UPDATE SET
49             type = excluded.type,
50             location = excluded.location,
51             last_pulled_at = excluded.last_pulled_at,
52             last_commit_sha = excluded.last_commit_sha",
53        params![
54            id,
55            type_,
56            location,
57            last_pulled_at.to_rfc3339(),
58            last_commit_sha
59        ],
60    )?;
61    Ok(())
62}
63
64fn parse_ts(s: &str) -> anyhow::Result<DateTime<Utc>> {
65    Ok(DateTime::parse_from_rfc3339(s)?.with_timezone(&Utc))
66}
67
68pub fn list(conn: &Connection) -> anyhow::Result<Vec<SourceRow>> {
69    let mut stmt = conn.prepare(
70        "SELECT id, type, location, last_pulled_at, last_commit_sha
71         FROM sources ORDER BY id",
72    )?;
73    let rows = stmt
74        .query_map([], |row| {
75            Ok((
76                row.get::<_, String>(0)?,
77                row.get::<_, String>(1)?,
78                row.get::<_, String>(2)?,
79                row.get::<_, Option<String>>(3)?,
80                row.get::<_, Option<String>>(4)?,
81            ))
82        })?
83        .collect::<Result<Vec<_>, _>>()?;
84    let mut out = Vec::with_capacity(rows.len());
85    for (id, ty, loc, pulled, sha) in rows {
86        out.push(SourceRow {
87            id,
88            r#type: ty,
89            location: loc,
90            last_pulled_at: pulled.as_deref().map(parse_ts).transpose()?,
91            last_commit_sha: sha,
92        });
93    }
94    Ok(out)
95}
96
97pub fn get(conn: &Connection, id: &str) -> anyhow::Result<Option<SourceRow>> {
98    let mut stmt = conn.prepare(
99        "SELECT id, type, location, last_pulled_at, last_commit_sha
100         FROM sources WHERE id = ?",
101    )?;
102    let mut rows = stmt.query(params![id])?;
103    let Some(row) = rows.next()? else {
104        return Ok(None);
105    };
106    let pulled: Option<String> = row.get(3)?;
107    Ok(Some(SourceRow {
108        id: row.get(0)?,
109        r#type: row.get(1)?,
110        location: row.get(2)?,
111        last_pulled_at: pulled.as_deref().map(parse_ts).transpose()?,
112        last_commit_sha: row.get(4)?,
113    }))
114}
115
116/// Delete a source row by id. Returns the number of rows deleted (0 or 1).
117/// Caller is responsible for removing tools whose `source_repo` matches.
118pub fn delete(conn: &Connection, id: &str) -> anyhow::Result<usize> {
119    let n = conn.execute("DELETE FROM sources WHERE id = ?", params![id])?;
120    Ok(n)
121}
122
123#[cfg(test)]
124mod tests {
125    use super::*;
126    use crate::open;
127
128    #[test]
129    fn upsert_inserts_then_updates_in_place() {
130        let dir = tempfile::tempdir().unwrap();
131        let conn = open(&dir.path().join("t.sqlite")).unwrap();
132        upsert(&conn, "gh:foo/bar", "github", "https://github.com/foo/bar").unwrap();
133        upsert(&conn, "gh:foo/bar", "github", "https://github.com/foo/bar2").unwrap();
134
135        let count: i64 = conn
136            .query_row("SELECT COUNT(*) FROM sources", [], |r| r.get(0))
137            .unwrap();
138        assert_eq!(count, 1);
139
140        let loc: String = conn
141            .query_row(
142                "SELECT location FROM sources WHERE id = ?",
143                params!["gh:foo/bar"],
144                |r| r.get(0),
145            )
146            .unwrap();
147        assert_eq!(loc, "https://github.com/foo/bar2");
148    }
149
150    #[test]
151    fn upsert_full_round_trip_through_get_and_list() {
152        let dir = tempfile::tempdir().unwrap();
153        let conn = open(&dir.path().join("t.sqlite")).unwrap();
154        let pulled = Utc::now();
155        upsert_full(
156            &conn,
157            "gh:a/b",
158            "github",
159            "https://github.com/a/b",
160            pulled,
161            Some("deadbeefcafe"),
162        )
163        .unwrap();
164
165        let row = get(&conn, "gh:a/b").unwrap().unwrap();
166        assert_eq!(row.r#type, "github");
167        assert_eq!(row.location, "https://github.com/a/b");
168        assert_eq!(row.last_commit_sha.as_deref(), Some("deadbeefcafe"));
169        assert!(row.last_pulled_at.is_some());
170
171        upsert_full(
172            &conn,
173            "gh:a/b",
174            "github",
175            "https://github.com/a/b",
176            pulled,
177            Some("newsha"),
178        )
179        .unwrap();
180        let row2 = get(&conn, "gh:a/b").unwrap().unwrap();
181        assert_eq!(row2.last_commit_sha.as_deref(), Some("newsha"));
182
183        upsert_full(
184            &conn,
185            "gh:c/d",
186            "github",
187            "https://github.com/c/d",
188            pulled,
189            None,
190        )
191        .unwrap();
192        let all = list(&conn).unwrap();
193        assert_eq!(all.len(), 2);
194        assert_eq!(all[0].id, "gh:a/b");
195        assert_eq!(all[1].id, "gh:c/d");
196    }
197
198    #[test]
199    fn delete_returns_count_and_get_returns_none() {
200        let dir = tempfile::tempdir().unwrap();
201        let conn = open(&dir.path().join("t.sqlite")).unwrap();
202        upsert(&conn, "gh:foo/bar", "github", "https://github.com/foo/bar").unwrap();
203        assert_eq!(delete(&conn, "gh:foo/bar").unwrap(), 1);
204        assert_eq!(delete(&conn, "gh:foo/bar").unwrap(), 0);
205        assert!(get(&conn, "gh:foo/bar").unwrap().is_none());
206    }
207}