Skip to main content

citadeldb_sync/
transport.rs

1use citadel_core::types::PageId;
2
3use crate::diff::{DiffEntry, MerkleHash, PageDigest, TreeReader};
4use crate::protocol::SyncMessage;
5
6/// Errors from sync transport operations.
7#[derive(Debug, thiserror::Error)]
8pub enum SyncError {
9    #[error("transport I/O error: {0}")]
10    Io(#[from] std::io::Error),
11
12    #[error("protocol error: {0}")]
13    Protocol(#[from] crate::protocol::ProtocolError),
14
15    #[error("unexpected message: expected {expected}, got {actual}")]
16    UnexpectedMessage { expected: String, actual: String },
17
18    #[error("remote error: {0}")]
19    Remote(String),
20
21    #[error("transport closed")]
22    Closed,
23
24    #[error("database error: {0}")]
25    Database(#[from] citadel_core::Error),
26
27    #[error("patch error: {0}")]
28    Patch(#[from] crate::patch::PatchError),
29
30    #[error("handshake failed: {0}")]
31    Handshake(String),
32}
33
34/// Bidirectional message transport for sync protocol.
35/// Uses `&self` with interior mutability for shared access during Merkle diff.
36pub trait SyncTransport: Send {
37    /// Send a message to the remote peer.
38    fn send(&self, msg: &SyncMessage) -> std::result::Result<(), SyncError>;
39
40    /// Receive the next message from the remote peer.
41    fn recv(&self) -> std::result::Result<SyncMessage, SyncError>;
42
43    /// Close the transport connection.
44    fn close(&self) -> std::result::Result<(), SyncError>;
45}
46
47/// `TreeReader` that reads from a remote database via `SyncTransport`.
48///
49/// Sends `DigestRequest`/`EntriesRequest` messages and blocks waiting
50/// for responses. Used by `merkle_diff()` to compare trees across nodes.
51pub struct RemoteTreeReader<'a> {
52    transport: &'a dyn SyncTransport,
53    root_page: PageId,
54    root_hash: MerkleHash,
55}
56
57impl<'a> RemoteTreeReader<'a> {
58    pub fn new(
59        transport: &'a dyn SyncTransport,
60        root_page: PageId,
61        root_hash: MerkleHash,
62    ) -> Self {
63        Self { transport, root_page, root_hash }
64    }
65}
66
67impl TreeReader for RemoteTreeReader<'_> {
68    fn root_info(&self) -> citadel_core::Result<(PageId, MerkleHash)> {
69        Ok((self.root_page, self.root_hash))
70    }
71
72    fn page_digest(&self, page_id: PageId) -> citadel_core::Result<PageDigest> {
73        self.transport.send(&SyncMessage::DigestRequest {
74            page_ids: vec![page_id],
75        }).map_err(sync_to_core)?;
76
77        match self.transport.recv().map_err(sync_to_core)? {
78            SyncMessage::DigestResponse { mut digests } if !digests.is_empty() => {
79                Ok(digests.remove(0))
80            }
81            SyncMessage::DigestResponse { .. } => {
82                Err(citadel_core::Error::Io(std::io::Error::new(
83                    std::io::ErrorKind::InvalidData,
84                    "empty digest response",
85                )))
86            }
87            SyncMessage::Error { message } => {
88                Err(citadel_core::Error::Io(std::io::Error::new(
89                    std::io::ErrorKind::Other,
90                    message,
91                )))
92            }
93            other => Err(citadel_core::Error::Io(std::io::Error::new(
94                std::io::ErrorKind::InvalidData,
95                format!("expected DigestResponse, got {}", msg_name(&other)),
96            ))),
97        }
98    }
99
100    fn leaf_entries(&self, page_id: PageId) -> citadel_core::Result<Vec<DiffEntry>> {
101        self.transport.send(&SyncMessage::EntriesRequest {
102            page_ids: vec![page_id],
103        }).map_err(sync_to_core)?;
104
105        match self.transport.recv().map_err(sync_to_core)? {
106            SyncMessage::EntriesResponse { entries } => Ok(entries),
107            SyncMessage::Error { message } => {
108                Err(citadel_core::Error::Io(std::io::Error::new(
109                    std::io::ErrorKind::Other,
110                    message,
111                )))
112            }
113            other => Err(citadel_core::Error::Io(std::io::Error::new(
114                std::io::ErrorKind::InvalidData,
115                format!("expected EntriesResponse, got {}", msg_name(&other)),
116            ))),
117        }
118    }
119}
120
121fn sync_to_core(e: SyncError) -> citadel_core::Error {
122    citadel_core::Error::Io(std::io::Error::new(std::io::ErrorKind::Other, e.to_string()))
123}
124
125pub(crate) fn msg_name(msg: &SyncMessage) -> &'static str {
126    match msg {
127        SyncMessage::Hello { .. } => "Hello",
128        SyncMessage::HelloAck { .. } => "HelloAck",
129        SyncMessage::DigestRequest { .. } => "DigestRequest",
130        SyncMessage::DigestResponse { .. } => "DigestResponse",
131        SyncMessage::EntriesRequest { .. } => "EntriesRequest",
132        SyncMessage::EntriesResponse { .. } => "EntriesResponse",
133        SyncMessage::PatchData { .. } => "PatchData",
134        SyncMessage::PatchAck { .. } => "PatchAck",
135        SyncMessage::Done => "Done",
136        SyncMessage::Error { .. } => "Error",
137        SyncMessage::PullRequest => "PullRequest",
138        SyncMessage::PullResponse { .. } => "PullResponse",
139        SyncMessage::TableListRequest => "TableListRequest",
140        SyncMessage::TableListResponse { .. } => "TableListResponse",
141        SyncMessage::TableSyncBegin { .. } => "TableSyncBegin",
142        SyncMessage::TableSyncEnd { .. } => "TableSyncEnd",
143    }
144}