radicle/node/seed/
store.rs1#![allow(clippy::type_complexity)]
2use std::str::FromStr;
3
4use localtime::LocalTime;
5use sqlite as sql;
6use thiserror::Error;
7
8use crate::git::Oid;
9use crate::node::address;
10use crate::node::address::Store as _;
11use crate::node::NodeId;
12use crate::node::{seed::SyncedSeed, Database, SyncedAt};
13use crate::prelude::{RepoId, Timestamp};
14
15#[derive(Error, Debug)]
16pub enum Error {
17 #[error("internal error: {0}")]
19 Internal(#[from] sql::Error),
20 #[error("address store error: {0}")]
22 Addresses(#[from] address::Error),
23}
24
25pub trait Store: address::Store {
29 fn synced(
31 &mut self,
32 rid: &RepoId,
33 nid: &NodeId,
34 at: Oid,
35 timestamp: Timestamp,
36 ) -> Result<bool, Error>;
37 fn seeded_by(
39 &self,
40 nid: &NodeId,
41 ) -> Result<Box<dyn Iterator<Item = Result<(RepoId, SyncedAt), Error>> + '_>, Error>;
42 fn seeds_for(
44 &self,
45 rid: &RepoId,
46 ) -> Result<Box<dyn Iterator<Item = Result<SyncedSeed, Error>> + '_>, Error>;
47}
48
49impl Store for Database {
50 fn synced(
51 &mut self,
52 rid: &RepoId,
53 nid: &NodeId,
54 at: Oid,
55 timestamp: Timestamp,
56 ) -> Result<bool, Error> {
57 let mut stmt = self.db.prepare(
58 "INSERT INTO `repo-sync-status` (repo, node, head, timestamp)
59 VALUES (?1, ?2, ?3, ?4)
60 ON CONFLICT DO UPDATE
61 SET head = ?3, timestamp = ?4
62 WHERE timestamp < ?4 AND head <> ?3",
63 )?;
64 stmt.bind((1, rid))?;
65 stmt.bind((2, nid))?;
66 stmt.bind((3, at.to_string().as_str()))?;
67 stmt.bind((4, ×tamp))?;
68 stmt.next()?;
69
70 Ok(self.db.change_count() > 0)
71 }
72
73 fn seeds_for(
74 &self,
75 rid: &RepoId,
76 ) -> Result<Box<dyn Iterator<Item = Result<SyncedSeed, Error>> + '_>, Error> {
77 let mut stmt = self.db.prepare(
78 "SELECT node, head, timestamp
79 FROM `repo-sync-status`
80 WHERE repo = ?",
81 )?;
82 stmt.bind((1, rid))?;
83
84 Ok(Box::new(stmt.into_iter().map(|row| {
85 let row = row?;
86 let nid = row.try_read::<NodeId, _>("node")?;
87 let oid = row.try_read::<&str, _>("head")?;
88 let oid = Oid::from_str(oid).map_err(|e| {
89 Error::Internal(sql::Error {
90 code: None,
91 message: Some(format!("sql: invalid oid '{oid}': {e}")),
92 })
93 })?;
94 let timestamp = row.try_read::<i64, _>("timestamp")?;
95 let timestamp = LocalTime::from_millis(timestamp as u128);
96 let addresses = self.addresses_of(&nid)?;
97
98 Ok(SyncedSeed {
99 nid,
100 addresses,
101 synced_at: SyncedAt { oid, timestamp },
102 })
103 })))
104 }
105
106 fn seeded_by(
107 &self,
108 nid: &NodeId,
109 ) -> Result<Box<dyn Iterator<Item = Result<(RepoId, SyncedAt), Error>> + '_>, Error> {
110 let mut stmt = self.db.prepare(
111 "SELECT repo, head, timestamp
112 FROM `repo-sync-status`
113 WHERE node = ?",
114 )?;
115 stmt.bind((1, nid))?;
116
117 Ok(Box::new(stmt.into_iter().map(|row| {
118 let row = row?;
119 let rid = row.try_read::<RepoId, _>("repo")?;
120 let oid = row.try_read::<&str, _>("head")?;
121 let oid = Oid::from_str(oid).map_err(|e| {
122 Error::Internal(sql::Error {
123 code: None,
124 message: Some(format!("sql: invalid oid '{oid}': {e}")),
125 })
126 })?;
127 let timestamp = row.try_read::<i64, _>("timestamp")?;
128 let timestamp = LocalTime::from_millis(timestamp as u128);
129
130 Ok((rid, SyncedAt { oid, timestamp }))
131 })))
132 }
133}