citadel_sync/
transport.rs1use citadel_core::types::PageId;
2
3use crate::diff::{DiffEntry, MerkleHash, PageDigest, TreeReader};
4use crate::protocol::SyncMessage;
5
6#[derive(Debug, thiserror::Error)]
8pub enum SyncError {
9 #[error("transport I/O error: {0}")]
10 Io(#[from] std::io::Error),
11
12 #[error("protocol error: {0}")]
13 Protocol(#[from] crate::protocol::ProtocolError),
14
15 #[error("unexpected message: expected {expected}, got {actual}")]
16 UnexpectedMessage { expected: String, actual: String },
17
18 #[error("remote error: {0}")]
19 Remote(String),
20
21 #[error("transport closed")]
22 Closed,
23
24 #[error("database error: {0}")]
25 Database(#[from] citadel_core::Error),
26
27 #[error("patch error: {0}")]
28 Patch(#[from] crate::patch::PatchError),
29
30 #[error("handshake failed: {0}")]
31 Handshake(String),
32}
33
34pub trait SyncTransport: Send {
37 fn send(&self, msg: &SyncMessage) -> std::result::Result<(), SyncError>;
38
39 fn recv(&self) -> std::result::Result<SyncMessage, SyncError>;
40
41 fn close(&self) -> std::result::Result<(), SyncError>;
42}
43
44pub struct RemoteTreeReader<'a> {
49 transport: &'a dyn SyncTransport,
50 root_page: PageId,
51 root_hash: MerkleHash,
52}
53
54impl<'a> RemoteTreeReader<'a> {
55 pub fn new(transport: &'a dyn SyncTransport, root_page: PageId, root_hash: MerkleHash) -> Self {
56 Self {
57 transport,
58 root_page,
59 root_hash,
60 }
61 }
62}
63
64impl TreeReader for RemoteTreeReader<'_> {
65 fn root_info(&self) -> citadel_core::Result<(PageId, MerkleHash)> {
66 Ok((self.root_page, self.root_hash))
67 }
68
69 fn page_digest(&self, page_id: PageId) -> citadel_core::Result<PageDigest> {
70 self.transport
71 .send(&SyncMessage::DigestRequest {
72 page_ids: vec![page_id],
73 })
74 .map_err(sync_to_core)?;
75
76 match self.transport.recv().map_err(sync_to_core)? {
77 SyncMessage::DigestResponse { mut digests } if !digests.is_empty() => {
78 Ok(digests.remove(0))
79 }
80 SyncMessage::DigestResponse { .. } => Err(citadel_core::Error::Io(
81 std::io::Error::new(std::io::ErrorKind::InvalidData, "empty digest response"),
82 )),
83 SyncMessage::Error { message } => {
84 Err(citadel_core::Error::Io(std::io::Error::other(message)))
85 }
86 other => Err(citadel_core::Error::Io(std::io::Error::new(
87 std::io::ErrorKind::InvalidData,
88 format!("expected DigestResponse, got {}", msg_name(&other)),
89 ))),
90 }
91 }
92
93 fn leaf_entries(&self, page_id: PageId) -> citadel_core::Result<Vec<DiffEntry>> {
94 self.transport
95 .send(&SyncMessage::EntriesRequest {
96 page_ids: vec![page_id],
97 })
98 .map_err(sync_to_core)?;
99
100 match self.transport.recv().map_err(sync_to_core)? {
101 SyncMessage::EntriesResponse { entries } => Ok(entries),
102 SyncMessage::Error { message } => {
103 Err(citadel_core::Error::Io(std::io::Error::other(message)))
104 }
105 other => Err(citadel_core::Error::Io(std::io::Error::new(
106 std::io::ErrorKind::InvalidData,
107 format!("expected EntriesResponse, got {}", msg_name(&other)),
108 ))),
109 }
110 }
111}
112
113fn sync_to_core(e: SyncError) -> citadel_core::Error {
114 citadel_core::Error::Io(std::io::Error::other(e.to_string()))
115}
116
117pub(crate) fn msg_name(msg: &SyncMessage) -> &'static str {
118 match msg {
119 SyncMessage::Hello { .. } => "Hello",
120 SyncMessage::HelloAck { .. } => "HelloAck",
121 SyncMessage::DigestRequest { .. } => "DigestRequest",
122 SyncMessage::DigestResponse { .. } => "DigestResponse",
123 SyncMessage::EntriesRequest { .. } => "EntriesRequest",
124 SyncMessage::EntriesResponse { .. } => "EntriesResponse",
125 SyncMessage::PatchData { .. } => "PatchData",
126 SyncMessage::PatchAck { .. } => "PatchAck",
127 SyncMessage::Done => "Done",
128 SyncMessage::Error { .. } => "Error",
129 SyncMessage::PullRequest => "PullRequest",
130 SyncMessage::PullResponse { .. } => "PullResponse",
131 SyncMessage::TableListRequest => "TableListRequest",
132 SyncMessage::TableListResponse { .. } => "TableListResponse",
133 SyncMessage::TableSyncBegin { .. } => "TableSyncBegin",
134 SyncMessage::TableSyncEnd { .. } => "TableSyncEnd",
135 }
136}