citadeldb_sync/
transport.rs1use citadel_core::types::PageId;
2
3use crate::diff::{DiffEntry, MerkleHash, PageDigest, TreeReader};
4use crate::protocol::SyncMessage;
5
6#[derive(Debug, thiserror::Error)]
8pub enum SyncError {
9 #[error("transport I/O error: {0}")]
10 Io(#[from] std::io::Error),
11
12 #[error("protocol error: {0}")]
13 Protocol(#[from] crate::protocol::ProtocolError),
14
15 #[error("unexpected message: expected {expected}, got {actual}")]
16 UnexpectedMessage { expected: String, actual: String },
17
18 #[error("remote error: {0}")]
19 Remote(String),
20
21 #[error("transport closed")]
22 Closed,
23
24 #[error("database error: {0}")]
25 Database(#[from] citadel_core::Error),
26
27 #[error("patch error: {0}")]
28 Patch(#[from] crate::patch::PatchError),
29
30 #[error("handshake failed: {0}")]
31 Handshake(String),
32}
33
34pub trait SyncTransport: Send {
37 fn send(&self, msg: &SyncMessage) -> std::result::Result<(), SyncError>;
39
40 fn recv(&self) -> std::result::Result<SyncMessage, SyncError>;
42
43 fn close(&self) -> std::result::Result<(), SyncError>;
45}
46
47pub struct RemoteTreeReader<'a> {
52 transport: &'a dyn SyncTransport,
53 root_page: PageId,
54 root_hash: MerkleHash,
55}
56
57impl<'a> RemoteTreeReader<'a> {
58 pub fn new(
59 transport: &'a dyn SyncTransport,
60 root_page: PageId,
61 root_hash: MerkleHash,
62 ) -> Self {
63 Self { transport, root_page, root_hash }
64 }
65}
66
67impl TreeReader for RemoteTreeReader<'_> {
68 fn root_info(&self) -> citadel_core::Result<(PageId, MerkleHash)> {
69 Ok((self.root_page, self.root_hash))
70 }
71
72 fn page_digest(&self, page_id: PageId) -> citadel_core::Result<PageDigest> {
73 self.transport.send(&SyncMessage::DigestRequest {
74 page_ids: vec![page_id],
75 }).map_err(sync_to_core)?;
76
77 match self.transport.recv().map_err(sync_to_core)? {
78 SyncMessage::DigestResponse { mut digests } if !digests.is_empty() => {
79 Ok(digests.remove(0))
80 }
81 SyncMessage::DigestResponse { .. } => {
82 Err(citadel_core::Error::Io(std::io::Error::new(
83 std::io::ErrorKind::InvalidData,
84 "empty digest response",
85 )))
86 }
87 SyncMessage::Error { message } => {
88 Err(citadel_core::Error::Io(std::io::Error::new(
89 std::io::ErrorKind::Other,
90 message,
91 )))
92 }
93 other => Err(citadel_core::Error::Io(std::io::Error::new(
94 std::io::ErrorKind::InvalidData,
95 format!("expected DigestResponse, got {}", msg_name(&other)),
96 ))),
97 }
98 }
99
100 fn leaf_entries(&self, page_id: PageId) -> citadel_core::Result<Vec<DiffEntry>> {
101 self.transport.send(&SyncMessage::EntriesRequest {
102 page_ids: vec![page_id],
103 }).map_err(sync_to_core)?;
104
105 match self.transport.recv().map_err(sync_to_core)? {
106 SyncMessage::EntriesResponse { entries } => Ok(entries),
107 SyncMessage::Error { message } => {
108 Err(citadel_core::Error::Io(std::io::Error::new(
109 std::io::ErrorKind::Other,
110 message,
111 )))
112 }
113 other => Err(citadel_core::Error::Io(std::io::Error::new(
114 std::io::ErrorKind::InvalidData,
115 format!("expected EntriesResponse, got {}", msg_name(&other)),
116 ))),
117 }
118 }
119}
120
121fn sync_to_core(e: SyncError) -> citadel_core::Error {
122 citadel_core::Error::Io(std::io::Error::new(std::io::ErrorKind::Other, e.to_string()))
123}
124
125pub(crate) fn msg_name(msg: &SyncMessage) -> &'static str {
126 match msg {
127 SyncMessage::Hello { .. } => "Hello",
128 SyncMessage::HelloAck { .. } => "HelloAck",
129 SyncMessage::DigestRequest { .. } => "DigestRequest",
130 SyncMessage::DigestResponse { .. } => "DigestResponse",
131 SyncMessage::EntriesRequest { .. } => "EntriesRequest",
132 SyncMessage::EntriesResponse { .. } => "EntriesResponse",
133 SyncMessage::PatchData { .. } => "PatchData",
134 SyncMessage::PatchAck { .. } => "PatchAck",
135 SyncMessage::Done => "Done",
136 SyncMessage::Error { .. } => "Error",
137 SyncMessage::PullRequest => "PullRequest",
138 SyncMessage::PullResponse { .. } => "PullResponse",
139 SyncMessage::TableListRequest => "TableListRequest",
140 SyncMessage::TableListResponse { .. } => "TableListResponse",
141 SyncMessage::TableSyncBegin { .. } => "TableSyncBegin",
142 SyncMessage::TableSyncEnd { .. } => "TableSyncEnd",
143 }
144}