Skip to main content

pulsedb/sync/
error.rs

1//! Sync-specific error types.
2//!
3//! [`SyncError`] covers all failure modes in the sync protocol:
4//! transport failures, handshake rejections, serialization issues,
5//! and shutdown coordination.
6
7use thiserror::Error;
8
9use super::types::InstanceId;
10
11/// Errors specific to the sync protocol.
12///
13/// These are wrapped into [`crate::PulseDBError::Sync`] when propagated
14/// through the public API.
15#[derive(Debug, Error)]
16pub enum SyncError {
17    /// Transport-level failure (network I/O, connection refused, etc.).
18    #[error("Sync transport error: {0}")]
19    Transport(String),
20
21    /// Handshake was rejected by the remote peer.
22    #[error("Sync handshake failed: {0}")]
23    Handshake(String),
24
25    /// Failed to serialize or deserialize a sync message.
26    #[error("Sync serialization error: {0}")]
27    Serialization(String),
28
29    /// Operation timed out waiting for a response.
30    #[error("Sync operation timed out")]
31    Timeout,
32
33    /// Connection to the remote peer was lost.
34    #[error("Connection to sync peer lost")]
35    ConnectionLost,
36
37    /// Protocol version mismatch between peers.
38    #[error("Sync protocol version mismatch: local v{local}, remote v{remote}")]
39    ProtocolVersion {
40        /// Local protocol version.
41        local: u32,
42        /// Remote protocol version.
43        remote: u32,
44    },
45
46    /// Received an invalid or unrecognized payload.
47    #[error("Invalid sync payload: {0}")]
48    InvalidPayload(String),
49
50    /// No cursor found for the specified peer instance.
51    #[error("No sync cursor found for instance {instance}")]
52    CursorNotFound {
53        /// The peer instance whose cursor was not found.
54        instance: InstanceId,
55    },
56
57    /// The sync system is shutting down.
58    #[error("Sync system is shutting down")]
59    Shutdown,
60}
61
62impl SyncError {
63    /// Creates a transport error with the given message.
64    pub fn transport(msg: impl Into<String>) -> Self {
65        Self::Transport(msg.into())
66    }
67
68    /// Creates a handshake error with the given message.
69    pub fn handshake(msg: impl Into<String>) -> Self {
70        Self::Handshake(msg.into())
71    }
72
73    /// Creates a serialization error with the given message.
74    pub fn serialization(msg: impl Into<String>) -> Self {
75        Self::Serialization(msg.into())
76    }
77
78    /// Creates an invalid payload error with the given message.
79    pub fn invalid_payload(msg: impl Into<String>) -> Self {
80        Self::InvalidPayload(msg.into())
81    }
82
83    /// Returns true if this is a transport error.
84    pub fn is_transport(&self) -> bool {
85        matches!(self, Self::Transport(_))
86    }
87
88    /// Returns true if this is a timeout error.
89    pub fn is_timeout(&self) -> bool {
90        matches!(self, Self::Timeout)
91    }
92
93    /// Returns true if this is a connection lost error.
94    pub fn is_connection_lost(&self) -> bool {
95        matches!(self, Self::ConnectionLost)
96    }
97
98    /// Returns true if this is a shutdown error.
99    pub fn is_shutdown(&self) -> bool {
100        matches!(self, Self::Shutdown)
101    }
102}
103
104impl From<bincode::Error> for SyncError {
105    fn from(err: bincode::Error) -> Self {
106        SyncError::Serialization(err.to_string())
107    }
108}
109
110#[cfg(test)]
111mod tests {
112    use super::*;
113
114    #[test]
115    fn test_sync_error_display() {
116        let err = SyncError::transport("connection refused");
117        assert_eq!(err.to_string(), "Sync transport error: connection refused");
118    }
119
120    #[test]
121    fn test_protocol_version_display() {
122        let err = SyncError::ProtocolVersion {
123            local: 1,
124            remote: 2,
125        };
126        assert_eq!(
127            err.to_string(),
128            "Sync protocol version mismatch: local v1, remote v2"
129        );
130    }
131
132    #[test]
133    fn test_sync_error_is_checks() {
134        assert!(SyncError::transport("x").is_transport());
135        assert!(SyncError::Timeout.is_timeout());
136        assert!(SyncError::ConnectionLost.is_connection_lost());
137        assert!(SyncError::Shutdown.is_shutdown());
138    }
139
140    #[test]
141    fn test_bincode_error_conversion() {
142        // Deserializing truncated bytes triggers a bincode error
143        let bad_bytes = vec![0u8; 1]; // too short for a (u64, u64)
144        let bincode_err = bincode::deserialize::<(u64, u64)>(&bad_bytes).unwrap_err();
145        let sync_err: SyncError = bincode_err.into();
146        assert!(matches!(sync_err, SyncError::Serialization(_)));
147    }
148}