Skip to main content

citadel_sync/
transport.rs

1use citadel_core::types::PageId;
2
3use crate::diff::{DiffEntry, MerkleHash, PageDigest, TreeReader};
4use crate::protocol::SyncMessage;
5
6/// Errors from sync transport operations.
7#[derive(Debug, thiserror::Error)]
8pub enum SyncError {
9    #[error("transport I/O error: {0}")]
10    Io(#[from] std::io::Error),
11
12    #[error("protocol error: {0}")]
13    Protocol(#[from] crate::protocol::ProtocolError),
14
15    #[error("unexpected message: expected {expected}, got {actual}")]
16    UnexpectedMessage { expected: String, actual: String },
17
18    #[error("remote error: {0}")]
19    Remote(String),
20
21    #[error("transport closed")]
22    Closed,
23
24    #[error("database error: {0}")]
25    Database(#[from] citadel_core::Error),
26
27    #[error("patch error: {0}")]
28    Patch(#[from] crate::patch::PatchError),
29
30    #[error("handshake failed: {0}")]
31    Handshake(String),
32}
33
34/// Bidirectional message transport for sync protocol.
35/// Uses `&self` with interior mutability for shared access during Merkle diff.
36pub trait SyncTransport: Send {
37    fn send(&self, msg: &SyncMessage) -> std::result::Result<(), SyncError>;
38
39    fn recv(&self) -> std::result::Result<SyncMessage, SyncError>;
40
41    fn close(&self) -> std::result::Result<(), SyncError>;
42}
43
44/// `TreeReader` that reads from a remote database via `SyncTransport`.
45///
46/// Sends `DigestRequest`/`EntriesRequest` messages and blocks waiting
47/// for responses. Used by `merkle_diff()` to compare trees across nodes.
48pub struct RemoteTreeReader<'a> {
49    transport: &'a dyn SyncTransport,
50    root_page: PageId,
51    root_hash: MerkleHash,
52}
53
54impl<'a> RemoteTreeReader<'a> {
55    pub fn new(transport: &'a dyn SyncTransport, root_page: PageId, root_hash: MerkleHash) -> Self {
56        Self {
57            transport,
58            root_page,
59            root_hash,
60        }
61    }
62}
63
64impl TreeReader for RemoteTreeReader<'_> {
65    fn root_info(&self) -> citadel_core::Result<(PageId, MerkleHash)> {
66        Ok((self.root_page, self.root_hash))
67    }
68
69    fn page_digest(&self, page_id: PageId) -> citadel_core::Result<PageDigest> {
70        self.transport
71            .send(&SyncMessage::DigestRequest {
72                page_ids: vec![page_id],
73            })
74            .map_err(sync_to_core)?;
75
76        match self.transport.recv().map_err(sync_to_core)? {
77            SyncMessage::DigestResponse { mut digests } if !digests.is_empty() => {
78                Ok(digests.remove(0))
79            }
80            SyncMessage::DigestResponse { .. } => Err(citadel_core::Error::Io(
81                std::io::Error::new(std::io::ErrorKind::InvalidData, "empty digest response"),
82            )),
83            SyncMessage::Error { message } => {
84                Err(citadel_core::Error::Io(std::io::Error::other(message)))
85            }
86            other => Err(citadel_core::Error::Io(std::io::Error::new(
87                std::io::ErrorKind::InvalidData,
88                format!("expected DigestResponse, got {}", msg_name(&other)),
89            ))),
90        }
91    }
92
93    fn leaf_entries(&self, page_id: PageId) -> citadel_core::Result<Vec<DiffEntry>> {
94        self.transport
95            .send(&SyncMessage::EntriesRequest {
96                page_ids: vec![page_id],
97            })
98            .map_err(sync_to_core)?;
99
100        match self.transport.recv().map_err(sync_to_core)? {
101            SyncMessage::EntriesResponse { entries } => Ok(entries),
102            SyncMessage::Error { message } => {
103                Err(citadel_core::Error::Io(std::io::Error::other(message)))
104            }
105            other => Err(citadel_core::Error::Io(std::io::Error::new(
106                std::io::ErrorKind::InvalidData,
107                format!("expected EntriesResponse, got {}", msg_name(&other)),
108            ))),
109        }
110    }
111}
112
113fn sync_to_core(e: SyncError) -> citadel_core::Error {
114    citadel_core::Error::Io(std::io::Error::other(e.to_string()))
115}
116
117pub(crate) fn msg_name(msg: &SyncMessage) -> &'static str {
118    match msg {
119        SyncMessage::Hello { .. } => "Hello",
120        SyncMessage::HelloAck { .. } => "HelloAck",
121        SyncMessage::DigestRequest { .. } => "DigestRequest",
122        SyncMessage::DigestResponse { .. } => "DigestResponse",
123        SyncMessage::EntriesRequest { .. } => "EntriesRequest",
124        SyncMessage::EntriesResponse { .. } => "EntriesResponse",
125        SyncMessage::PatchData { .. } => "PatchData",
126        SyncMessage::PatchAck { .. } => "PatchAck",
127        SyncMessage::Done => "Done",
128        SyncMessage::Error { .. } => "Error",
129        SyncMessage::PullRequest => "PullRequest",
130        SyncMessage::PullResponse { .. } => "PullResponse",
131        SyncMessage::TableListRequest => "TableListRequest",
132        SyncMessage::TableListResponse { .. } => "TableListResponse",
133        SyncMessage::TableSyncBegin { .. } => "TableSyncBegin",
134        SyncMessage::TableSyncEnd { .. } => "TableSyncEnd",
135    }
136}