Skip to main content

citadel_sync/
transport.rs

1use citadel_core::types::PageId;
2
3use crate::diff::{DiffEntry, MerkleHash, PageDigest, TreeReader};
4use crate::protocol::SyncMessage;
5
6/// Errors from sync transport operations.
7#[derive(Debug, thiserror::Error)]
8pub enum SyncError {
9    #[error("transport I/O error: {0}")]
10    Io(#[from] std::io::Error),
11
12    #[error("protocol error: {0}")]
13    Protocol(#[from] crate::protocol::ProtocolError),
14
15    #[error("unexpected message: expected {expected}, got {actual}")]
16    UnexpectedMessage { expected: String, actual: String },
17
18    #[error("remote error: {0}")]
19    Remote(String),
20
21    #[error("transport closed")]
22    Closed,
23
24    #[error("database error: {0}")]
25    Database(#[from] citadel_core::Error),
26
27    #[error("patch error: {0}")]
28    Patch(#[from] crate::patch::PatchError),
29
30    #[error("handshake failed: {0}")]
31    Handshake(String),
32}
33
34/// Bidirectional message transport for sync protocol.
35/// Uses `&self` with interior mutability for shared access during Merkle diff.
36pub trait SyncTransport: Send {
37    /// Send a message to the remote peer.
38    fn send(&self, msg: &SyncMessage) -> std::result::Result<(), SyncError>;
39
40    /// Receive the next message from the remote peer.
41    fn recv(&self) -> std::result::Result<SyncMessage, SyncError>;
42
43    /// Close the transport connection.
44    fn close(&self) -> std::result::Result<(), SyncError>;
45}
46
47/// `TreeReader` that reads from a remote database via `SyncTransport`.
48///
49/// Sends `DigestRequest`/`EntriesRequest` messages and blocks waiting
50/// for responses. Used by `merkle_diff()` to compare trees across nodes.
51pub struct RemoteTreeReader<'a> {
52    transport: &'a dyn SyncTransport,
53    root_page: PageId,
54    root_hash: MerkleHash,
55}
56
57impl<'a> RemoteTreeReader<'a> {
58    pub fn new(transport: &'a dyn SyncTransport, root_page: PageId, root_hash: MerkleHash) -> Self {
59        Self {
60            transport,
61            root_page,
62            root_hash,
63        }
64    }
65}
66
67impl TreeReader for RemoteTreeReader<'_> {
68    fn root_info(&self) -> citadel_core::Result<(PageId, MerkleHash)> {
69        Ok((self.root_page, self.root_hash))
70    }
71
72    fn page_digest(&self, page_id: PageId) -> citadel_core::Result<PageDigest> {
73        self.transport
74            .send(&SyncMessage::DigestRequest {
75                page_ids: vec![page_id],
76            })
77            .map_err(sync_to_core)?;
78
79        match self.transport.recv().map_err(sync_to_core)? {
80            SyncMessage::DigestResponse { mut digests } if !digests.is_empty() => {
81                Ok(digests.remove(0))
82            }
83            SyncMessage::DigestResponse { .. } => Err(citadel_core::Error::Io(
84                std::io::Error::new(std::io::ErrorKind::InvalidData, "empty digest response"),
85            )),
86            SyncMessage::Error { message } => {
87                Err(citadel_core::Error::Io(std::io::Error::other(message)))
88            }
89            other => Err(citadel_core::Error::Io(std::io::Error::new(
90                std::io::ErrorKind::InvalidData,
91                format!("expected DigestResponse, got {}", msg_name(&other)),
92            ))),
93        }
94    }
95
96    fn leaf_entries(&self, page_id: PageId) -> citadel_core::Result<Vec<DiffEntry>> {
97        self.transport
98            .send(&SyncMessage::EntriesRequest {
99                page_ids: vec![page_id],
100            })
101            .map_err(sync_to_core)?;
102
103        match self.transport.recv().map_err(sync_to_core)? {
104            SyncMessage::EntriesResponse { entries } => Ok(entries),
105            SyncMessage::Error { message } => {
106                Err(citadel_core::Error::Io(std::io::Error::other(message)))
107            }
108            other => Err(citadel_core::Error::Io(std::io::Error::new(
109                std::io::ErrorKind::InvalidData,
110                format!("expected EntriesResponse, got {}", msg_name(&other)),
111            ))),
112        }
113    }
114}
115
116fn sync_to_core(e: SyncError) -> citadel_core::Error {
117    citadel_core::Error::Io(std::io::Error::other(e.to_string()))
118}
119
120pub(crate) fn msg_name(msg: &SyncMessage) -> &'static str {
121    match msg {
122        SyncMessage::Hello { .. } => "Hello",
123        SyncMessage::HelloAck { .. } => "HelloAck",
124        SyncMessage::DigestRequest { .. } => "DigestRequest",
125        SyncMessage::DigestResponse { .. } => "DigestResponse",
126        SyncMessage::EntriesRequest { .. } => "EntriesRequest",
127        SyncMessage::EntriesResponse { .. } => "EntriesResponse",
128        SyncMessage::PatchData { .. } => "PatchData",
129        SyncMessage::PatchAck { .. } => "PatchAck",
130        SyncMessage::Done => "Done",
131        SyncMessage::Error { .. } => "Error",
132        SyncMessage::PullRequest => "PullRequest",
133        SyncMessage::PullResponse { .. } => "PullResponse",
134        SyncMessage::TableListRequest => "TableListRequest",
135        SyncMessage::TableListResponse { .. } => "TableListResponse",
136        SyncMessage::TableSyncBegin { .. } => "TableSyncBegin",
137        SyncMessage::TableSyncEnd { .. } => "TableSyncEnd",
138    }
139}