citadel_sync/
transport.rs1use citadel_core::types::PageId;
2
3use crate::diff::{DiffEntry, MerkleHash, PageDigest, TreeReader};
4use crate::protocol::SyncMessage;
5
6#[derive(Debug, thiserror::Error)]
8pub enum SyncError {
9 #[error("transport I/O error: {0}")]
10 Io(#[from] std::io::Error),
11
12 #[error("protocol error: {0}")]
13 Protocol(#[from] crate::protocol::ProtocolError),
14
15 #[error("unexpected message: expected {expected}, got {actual}")]
16 UnexpectedMessage { expected: String, actual: String },
17
18 #[error("remote error: {0}")]
19 Remote(String),
20
21 #[error("transport closed")]
22 Closed,
23
24 #[error("database error: {0}")]
25 Database(#[from] citadel_core::Error),
26
27 #[error("patch error: {0}")]
28 Patch(#[from] crate::patch::PatchError),
29
30 #[error("handshake failed: {0}")]
31 Handshake(String),
32}
33
34pub trait SyncTransport: Send {
37 fn send(&self, msg: &SyncMessage) -> std::result::Result<(), SyncError>;
39
40 fn recv(&self) -> std::result::Result<SyncMessage, SyncError>;
42
43 fn close(&self) -> std::result::Result<(), SyncError>;
45}
46
47pub struct RemoteTreeReader<'a> {
52 transport: &'a dyn SyncTransport,
53 root_page: PageId,
54 root_hash: MerkleHash,
55}
56
57impl<'a> RemoteTreeReader<'a> {
58 pub fn new(transport: &'a dyn SyncTransport, root_page: PageId, root_hash: MerkleHash) -> Self {
59 Self {
60 transport,
61 root_page,
62 root_hash,
63 }
64 }
65}
66
67impl TreeReader for RemoteTreeReader<'_> {
68 fn root_info(&self) -> citadel_core::Result<(PageId, MerkleHash)> {
69 Ok((self.root_page, self.root_hash))
70 }
71
72 fn page_digest(&self, page_id: PageId) -> citadel_core::Result<PageDigest> {
73 self.transport
74 .send(&SyncMessage::DigestRequest {
75 page_ids: vec![page_id],
76 })
77 .map_err(sync_to_core)?;
78
79 match self.transport.recv().map_err(sync_to_core)? {
80 SyncMessage::DigestResponse { mut digests } if !digests.is_empty() => {
81 Ok(digests.remove(0))
82 }
83 SyncMessage::DigestResponse { .. } => Err(citadel_core::Error::Io(
84 std::io::Error::new(std::io::ErrorKind::InvalidData, "empty digest response"),
85 )),
86 SyncMessage::Error { message } => {
87 Err(citadel_core::Error::Io(std::io::Error::other(message)))
88 }
89 other => Err(citadel_core::Error::Io(std::io::Error::new(
90 std::io::ErrorKind::InvalidData,
91 format!("expected DigestResponse, got {}", msg_name(&other)),
92 ))),
93 }
94 }
95
96 fn leaf_entries(&self, page_id: PageId) -> citadel_core::Result<Vec<DiffEntry>> {
97 self.transport
98 .send(&SyncMessage::EntriesRequest {
99 page_ids: vec![page_id],
100 })
101 .map_err(sync_to_core)?;
102
103 match self.transport.recv().map_err(sync_to_core)? {
104 SyncMessage::EntriesResponse { entries } => Ok(entries),
105 SyncMessage::Error { message } => {
106 Err(citadel_core::Error::Io(std::io::Error::other(message)))
107 }
108 other => Err(citadel_core::Error::Io(std::io::Error::new(
109 std::io::ErrorKind::InvalidData,
110 format!("expected EntriesResponse, got {}", msg_name(&other)),
111 ))),
112 }
113 }
114}
115
116fn sync_to_core(e: SyncError) -> citadel_core::Error {
117 citadel_core::Error::Io(std::io::Error::other(e.to_string()))
118}
119
120pub(crate) fn msg_name(msg: &SyncMessage) -> &'static str {
121 match msg {
122 SyncMessage::Hello { .. } => "Hello",
123 SyncMessage::HelloAck { .. } => "HelloAck",
124 SyncMessage::DigestRequest { .. } => "DigestRequest",
125 SyncMessage::DigestResponse { .. } => "DigestResponse",
126 SyncMessage::EntriesRequest { .. } => "EntriesRequest",
127 SyncMessage::EntriesResponse { .. } => "EntriesResponse",
128 SyncMessage::PatchData { .. } => "PatchData",
129 SyncMessage::PatchAck { .. } => "PatchAck",
130 SyncMessage::Done => "Done",
131 SyncMessage::Error { .. } => "Error",
132 SyncMessage::PullRequest => "PullRequest",
133 SyncMessage::PullResponse { .. } => "PullResponse",
134 SyncMessage::TableListRequest => "TableListRequest",
135 SyncMessage::TableListResponse { .. } => "TableListResponse",
136 SyncMessage::TableSyncBegin { .. } => "TableSyncBegin",
137 SyncMessage::TableSyncEnd { .. } => "TableSyncEnd",
138 }
139}