Skip to main content

pulsedb/sync/
server.rs

1//! Server-side sync handler for HTTP consumers.
2//!
3//! [`SyncServer`] provides framework-agnostic methods for handling sync
4//! requests. Consumers wire these into their web framework (Axum, Actix, etc.)
5//! without PulseDB taking a dependency on any web framework.
6//!
7//! # Example (Axum)
8//!
9//! ```rust,ignore
10//! use std::sync::Arc;
11//! use axum::{Router, routing::{get, post}, extract::State, body::Bytes, http::StatusCode};
12//! use pulsedb::sync::server::SyncServer;
13//!
14//! async fn handle_health(State(server): State<Arc<SyncServer>>) -> StatusCode {
15//!     match server.handle_health() {
16//!         Ok(()) => StatusCode::OK,
17//!         Err(_) => StatusCode::SERVICE_UNAVAILABLE,
18//!     }
19//! }
20//!
21//! async fn handle_handshake(State(server): State<Arc<SyncServer>>, body: Bytes) -> Result<Vec<u8>, StatusCode> {
22//!     server.handle_handshake_bytes(&body).map_err(|_| StatusCode::BAD_REQUEST)
23//! }
24//! ```
25
26use std::sync::Arc;
27
28use tracing::{debug, info, instrument};
29
30use crate::db::PulseDB;
31use crate::watch::ChangePoller;
32
33use super::applier::RemoteChangeApplier;
34use super::config::SyncConfig;
35use super::error::SyncError;
36use super::types::{
37    HandshakeRequest, HandshakeResponse, InstanceId, PullRequest, PullResponse, PushResponse,
38    SyncChange, SyncCursor,
39};
40use super::SYNC_PROTOCOL_VERSION;
41
42/// Server-side sync handler.
43///
44/// Processes incoming sync requests from remote peers. Framework-agnostic —
45/// consumers create web handlers that delegate to this struct's methods.
46///
47/// The server manages its own `ChangePoller` for serving pull requests and
48/// delegates push handling to `RemoteChangeApplier`.
49pub struct SyncServer {
50    db: Arc<PulseDB>,
51    instance_id: InstanceId,
52    config: SyncConfig,
53}
54
55impl SyncServer {
56    /// Creates a new SyncServer for the given database.
57    pub fn new(db: Arc<PulseDB>, config: SyncConfig) -> Self {
58        let instance_id = db.storage_for_test().instance_id();
59        Self {
60            db,
61            instance_id,
62            config,
63        }
64    }
65
66    /// Returns the server's instance ID.
67    pub fn instance_id(&self) -> InstanceId {
68        self.instance_id
69    }
70
71    // ─── High-level handlers (typed) ─────────────────────────────────
72
73    /// Handles a handshake request.
74    #[instrument(skip(self, request), fields(peer = %request.instance_id))]
75    pub fn handle_handshake(
76        &self,
77        request: HandshakeRequest,
78    ) -> Result<HandshakeResponse, SyncError> {
79        if request.protocol_version != SYNC_PROTOCOL_VERSION {
80            return Ok(HandshakeResponse {
81                instance_id: self.instance_id,
82                protocol_version: SYNC_PROTOCOL_VERSION,
83                accepted: false,
84                reason: Some(format!(
85                    "Protocol version mismatch: server v{}, client v{}",
86                    SYNC_PROTOCOL_VERSION, request.protocol_version
87                )),
88            });
89        }
90
91        info!(peer = %request.instance_id, "Sync handshake accepted");
92        Ok(HandshakeResponse {
93            instance_id: self.instance_id,
94            protocol_version: SYNC_PROTOCOL_VERSION,
95            accepted: true,
96            reason: None,
97        })
98    }
99
100    /// Handles a push request — applies remote changes locally.
101    #[instrument(skip(self, changes), fields(count = changes.len()))]
102    pub fn handle_push(&self, changes: Vec<SyncChange>) -> Result<PushResponse, SyncError> {
103        let max_seq = changes.iter().map(|c| c.sequence).max().unwrap_or(0);
104        let source = changes
105            .first()
106            .map(|c| c.source_instance)
107            .unwrap_or_else(InstanceId::nil);
108
109        let applier = RemoteChangeApplier::new(Arc::clone(&self.db), self.config.clone());
110        let result = applier.apply_batch(changes)?;
111
112        debug!(
113            accepted = result.applied,
114            skipped = result.skipped,
115            conflicts = result.conflicts,
116            "Handled push"
117        );
118
119        Ok(PushResponse {
120            accepted: result.applied,
121            rejected: result.skipped,
122            new_cursor: SyncCursor {
123                instance_id: source,
124                last_sequence: max_seq,
125            },
126        })
127    }
128
129    /// Handles a pull request — serves local changes to the remote peer.
130    #[instrument(skip(self, request))]
131    pub fn handle_pull(&self, request: PullRequest) -> Result<PullResponse, SyncError> {
132        let storage = self.db.storage_for_test();
133        let mut poller = ChangePoller::from_sequence(request.cursor.last_sequence);
134
135        let events = poller
136            .poll_sync_events(storage)
137            .map_err(|e| SyncError::transport(format!("Failed to poll WAL for pull: {}", e)))?;
138
139        // Build SyncChanges from WAL events (same logic as pusher)
140        let mut changes = Vec::new();
141        for (sequence, record) in &events {
142            if let Some(change) =
143                build_change_from_record(&self.db, *sequence, record, self.instance_id)?
144            {
145                // Apply collective filter
146                if let Some(ref allowed) = request.collectives {
147                    if !allowed.contains(&change.collective_id) {
148                        continue;
149                    }
150                }
151                changes.push(change);
152                if changes.len() >= request.batch_size {
153                    break;
154                }
155            }
156        }
157
158        let has_more = events.len() > changes.len();
159        let new_seq = changes
160            .last()
161            .map(|c| c.sequence)
162            .unwrap_or(request.cursor.last_sequence);
163
164        Ok(PullResponse {
165            changes,
166            has_more,
167            new_cursor: SyncCursor {
168                instance_id: self.instance_id,
169                last_sequence: new_seq,
170            },
171        })
172    }
173
174    /// Handles a health check.
175    pub fn handle_health(&self) -> Result<(), SyncError> {
176        // Verify DB is accessible by reading metadata
177        let _seq = self
178            .db
179            .get_current_sequence()
180            .map_err(|e| SyncError::transport(format!("Health check failed: {}", e)))?;
181        Ok(())
182    }
183
184    // ─── Byte-level handlers (bincode in/out for HTTP) ───────────────
185
186    /// Handles a handshake from raw bincode bytes.
187    pub fn handle_handshake_bytes(&self, body: &[u8]) -> Result<Vec<u8>, SyncError> {
188        let request: HandshakeRequest = bincode::deserialize(body).map_err(SyncError::from)?;
189        let response = self.handle_handshake(request)?;
190        bincode::serialize(&response).map_err(|e| SyncError::serialization(e.to_string()))
191    }
192
193    /// Handles a push from raw bincode bytes.
194    pub fn handle_push_bytes(&self, body: &[u8]) -> Result<Vec<u8>, SyncError> {
195        let changes: Vec<SyncChange> = bincode::deserialize(body).map_err(SyncError::from)?;
196        let response = self.handle_push(changes)?;
197        bincode::serialize(&response).map_err(|e| SyncError::serialization(e.to_string()))
198    }
199
200    /// Handles a pull from raw bincode bytes.
201    pub fn handle_pull_bytes(&self, body: &[u8]) -> Result<Vec<u8>, SyncError> {
202        let request: PullRequest = bincode::deserialize(body).map_err(SyncError::from)?;
203        let response = self.handle_pull(request)?;
204        bincode::serialize(&response).map_err(|e| SyncError::serialization(e.to_string()))
205    }
206}
207
208/// Build a SyncChange from a WAL record by loading the full entity.
209fn build_change_from_record(
210    db: &PulseDB,
211    sequence: u64,
212    record: &crate::storage::schema::WatchEventRecord,
213    source_instance: InstanceId,
214) -> Result<Option<SyncChange>, SyncError> {
215    use super::types::{SerializableExperienceUpdate, SyncEntityType, SyncPayload};
216    use crate::storage::schema::{EntityTypeTag, WatchEventTypeTag};
217    use crate::types::{CollectiveId, ExperienceId, InsightId, RelationId, Timestamp};
218
219    let collective_id = CollectiveId::from_bytes(record.collective_id);
220    let timestamp = Timestamp::from_millis(record.timestamp_ms);
221    let map_err = |e: crate::error::PulseDBError| {
222        SyncError::transport(format!("Failed to load entity: {}", e))
223    };
224
225    let entity_type = match record.entity_type {
226        EntityTypeTag::Experience => SyncEntityType::Experience,
227        EntityTypeTag::Relation => SyncEntityType::Relation,
228        EntityTypeTag::Insight => SyncEntityType::Insight,
229        EntityTypeTag::Collective => SyncEntityType::Collective,
230    };
231
232    let payload = match (record.entity_type, record.event_type) {
233        (EntityTypeTag::Experience, WatchEventTypeTag::Created) => {
234            let id = ExperienceId::from_bytes(record.entity_id);
235            db.get_experience(id)
236                .map_err(map_err)?
237                .map(SyncPayload::ExperienceCreated)
238        }
239        (EntityTypeTag::Experience, WatchEventTypeTag::Updated) => {
240            let id = ExperienceId::from_bytes(record.entity_id);
241            db.get_experience(id)
242                .map_err(map_err)?
243                .map(|exp| SyncPayload::ExperienceUpdated {
244                    id,
245                    update: SerializableExperienceUpdate {
246                        importance: Some(exp.importance),
247                        confidence: Some(exp.confidence),
248                        domain: Some(exp.domain.clone()),
249                        related_files: Some(exp.related_files.clone()),
250                        archived: Some(exp.archived),
251                    },
252                    timestamp,
253                })
254        }
255        (EntityTypeTag::Experience, WatchEventTypeTag::Archived) => {
256            let id = ExperienceId::from_bytes(record.entity_id);
257            Some(SyncPayload::ExperienceArchived { id, timestamp })
258        }
259        (EntityTypeTag::Experience, WatchEventTypeTag::Deleted) => {
260            let id = ExperienceId::from_bytes(record.entity_id);
261            Some(SyncPayload::ExperienceDeleted { id, timestamp })
262        }
263        (EntityTypeTag::Relation, WatchEventTypeTag::Created) => {
264            let id = RelationId::from_bytes(record.entity_id);
265            db.get_relation(id)
266                .map_err(map_err)?
267                .map(SyncPayload::RelationCreated)
268        }
269        (EntityTypeTag::Relation, WatchEventTypeTag::Deleted) => {
270            let id = RelationId::from_bytes(record.entity_id);
271            Some(SyncPayload::RelationDeleted { id, timestamp })
272        }
273        (EntityTypeTag::Insight, WatchEventTypeTag::Created) => {
274            let id = InsightId::from_bytes(record.entity_id);
275            db.get_insight(id)
276                .map_err(map_err)?
277                .map(SyncPayload::InsightCreated)
278        }
279        (EntityTypeTag::Insight, WatchEventTypeTag::Deleted) => {
280            let id = InsightId::from_bytes(record.entity_id);
281            Some(SyncPayload::InsightDeleted { id, timestamp })
282        }
283        (EntityTypeTag::Collective, WatchEventTypeTag::Created) => {
284            let id = CollectiveId::from_bytes(record.entity_id);
285            db.get_collective(id)
286                .map_err(map_err)?
287                .map(SyncPayload::CollectiveCreated)
288        }
289        _ => None,
290    };
291
292    Ok(payload.map(|p| SyncChange {
293        sequence,
294        source_instance,
295        collective_id,
296        entity_type,
297        payload: p,
298        timestamp,
299    }))
300}
301
302// SyncServer is Send + Sync (Arc<PulseDB> is Send + Sync)
303#[cfg(test)]
304mod tests {
305    use super::*;
306
307    #[test]
308    fn test_sync_server_is_send_sync() {
309        fn assert_send_sync<T: Send + Sync>() {}
310        assert_send_sync::<SyncServer>();
311    }
312}