Skip to main content

radicle/node/seed/
store.rs

1#![allow(clippy::type_complexity)]
2use std::str::FromStr;
3
4use localtime::LocalTime;
5use sqlite as sql;
6use thiserror::Error;
7
8use crate::git::Oid;
9use crate::node::address;
10use crate::node::address::Store as _;
11use crate::node::NodeId;
12use crate::node::{seed::SyncedSeed, Database, SyncedAt};
13use crate::prelude::{RepoId, Timestamp};
14
15#[derive(Error, Debug)]
16pub enum Error {
17    /// An Internal error.
18    #[error("internal error: {0}")]
19    Internal(#[from] sql::Error),
20    /// An address store error.
21    #[error("address store error: {0}")]
22    Addresses(#[from] address::Error),
23}
24
25/// Seed store.
26///
27/// Used to store seed sync statuses.
28pub trait Store: address::Store {
29    /// Mark a repo as synced on the given node.
30    fn synced(
31        &mut self,
32        rid: &RepoId,
33        nid: &NodeId,
34        at: Oid,
35        timestamp: Timestamp,
36    ) -> Result<bool, Error>;
37    /// Get the repos seeded by the given node.
38    fn seeded_by(
39        &self,
40        nid: &NodeId,
41    ) -> Result<Box<dyn Iterator<Item = Result<(RepoId, SyncedAt), Error>> + '_>, Error>;
42    /// Get nodes that have synced the given repo.
43    fn seeds_for(
44        &self,
45        rid: &RepoId,
46    ) -> Result<Box<dyn Iterator<Item = Result<SyncedSeed, Error>> + '_>, Error>;
47}
48
49impl Store for Database {
50    fn synced(
51        &mut self,
52        rid: &RepoId,
53        nid: &NodeId,
54        at: Oid,
55        timestamp: Timestamp,
56    ) -> Result<bool, Error> {
57        let mut stmt = self.db.prepare(
58            "INSERT INTO `repo-sync-status` (repo, node, head, timestamp)
59             VALUES (?1, ?2, ?3, ?4)
60             ON CONFLICT DO UPDATE
61             SET head = ?3, timestamp = ?4
62             WHERE timestamp < ?4 AND head <> ?3",
63        )?;
64        stmt.bind((1, rid))?;
65        stmt.bind((2, nid))?;
66        stmt.bind((3, at.to_string().as_str()))?;
67        stmt.bind((4, &timestamp))?;
68        stmt.next()?;
69
70        Ok(self.db.change_count() > 0)
71    }
72
73    fn seeds_for(
74        &self,
75        rid: &RepoId,
76    ) -> Result<Box<dyn Iterator<Item = Result<SyncedSeed, Error>> + '_>, Error> {
77        let mut stmt = self.db.prepare(
78            "SELECT node, head, timestamp
79             FROM `repo-sync-status`
80             WHERE repo = ?",
81        )?;
82        stmt.bind((1, rid))?;
83
84        Ok(Box::new(stmt.into_iter().map(|row| {
85            let row = row?;
86            let nid = row.try_read::<NodeId, _>("node")?;
87            let oid = row.try_read::<&str, _>("head")?;
88            let oid = Oid::from_str(oid).map_err(|e| {
89                Error::Internal(sql::Error {
90                    code: None,
91                    message: Some(format!("sql: invalid oid '{oid}': {e}")),
92                })
93            })?;
94            let timestamp = row.try_read::<i64, _>("timestamp")?;
95            let timestamp = LocalTime::from_millis(timestamp as u128);
96            let addresses = self.addresses_of(&nid)?;
97
98            Ok(SyncedSeed {
99                nid,
100                addresses,
101                synced_at: SyncedAt { oid, timestamp },
102            })
103        })))
104    }
105
106    fn seeded_by(
107        &self,
108        nid: &NodeId,
109    ) -> Result<Box<dyn Iterator<Item = Result<(RepoId, SyncedAt), Error>> + '_>, Error> {
110        let mut stmt = self.db.prepare(
111            "SELECT repo, head, timestamp
112             FROM `repo-sync-status`
113             WHERE node = ?",
114        )?;
115        stmt.bind((1, nid))?;
116
117        Ok(Box::new(stmt.into_iter().map(|row| {
118            let row = row?;
119            let rid = row.try_read::<RepoId, _>("repo")?;
120            let oid = row.try_read::<&str, _>("head")?;
121            let oid = Oid::from_str(oid).map_err(|e| {
122                Error::Internal(sql::Error {
123                    code: None,
124                    message: Some(format!("sql: invalid oid '{oid}': {e}")),
125                })
126            })?;
127            let timestamp = row.try_read::<i64, _>("timestamp")?;
128            let timestamp = LocalTime::from_millis(timestamp as u128);
129
130            Ok((rid, SyncedAt { oid, timestamp }))
131        })))
132    }
133}