1use std::sync::Arc;
27
28use tracing::{debug, info, instrument};
29
30use crate::db::PulseDB;
31use crate::watch::ChangePoller;
32
33use super::applier::RemoteChangeApplier;
34use super::config::SyncConfig;
35use super::error::SyncError;
36use super::types::{
37 HandshakeRequest, HandshakeResponse, InstanceId, PullRequest, PullResponse, PushResponse,
38 SyncChange, SyncCursor,
39};
40use super::SYNC_PROTOCOL_VERSION;
41
42pub struct SyncServer {
50 db: Arc<PulseDB>,
51 instance_id: InstanceId,
52 config: SyncConfig,
53}
54
55impl SyncServer {
56 pub fn new(db: Arc<PulseDB>, config: SyncConfig) -> Self {
58 let instance_id = db.storage_for_test().instance_id();
59 Self {
60 db,
61 instance_id,
62 config,
63 }
64 }
65
66 pub fn instance_id(&self) -> InstanceId {
68 self.instance_id
69 }
70
71 #[instrument(skip(self, request), fields(peer = %request.instance_id))]
75 pub fn handle_handshake(
76 &self,
77 request: HandshakeRequest,
78 ) -> Result<HandshakeResponse, SyncError> {
79 if request.protocol_version != SYNC_PROTOCOL_VERSION {
80 return Ok(HandshakeResponse {
81 instance_id: self.instance_id,
82 protocol_version: SYNC_PROTOCOL_VERSION,
83 accepted: false,
84 reason: Some(format!(
85 "Protocol version mismatch: server v{}, client v{}",
86 SYNC_PROTOCOL_VERSION, request.protocol_version
87 )),
88 });
89 }
90
91 info!(peer = %request.instance_id, "Sync handshake accepted");
92 Ok(HandshakeResponse {
93 instance_id: self.instance_id,
94 protocol_version: SYNC_PROTOCOL_VERSION,
95 accepted: true,
96 reason: None,
97 })
98 }
99
100 #[instrument(skip(self, changes), fields(count = changes.len()))]
102 pub fn handle_push(&self, changes: Vec<SyncChange>) -> Result<PushResponse, SyncError> {
103 let max_seq = changes.iter().map(|c| c.sequence).max().unwrap_or(0);
104 let source = changes
105 .first()
106 .map(|c| c.source_instance)
107 .unwrap_or_else(InstanceId::nil);
108
109 let applier = RemoteChangeApplier::new(Arc::clone(&self.db), self.config.clone());
110 let result = applier.apply_batch(changes)?;
111
112 debug!(
113 accepted = result.applied,
114 skipped = result.skipped,
115 conflicts = result.conflicts,
116 "Handled push"
117 );
118
119 Ok(PushResponse {
120 accepted: result.applied,
121 rejected: result.skipped,
122 new_cursor: SyncCursor {
123 instance_id: source,
124 last_sequence: max_seq,
125 },
126 })
127 }
128
129 #[instrument(skip(self, request))]
131 pub fn handle_pull(&self, request: PullRequest) -> Result<PullResponse, SyncError> {
132 let storage = self.db.storage_for_test();
133 let mut poller = ChangePoller::from_sequence(request.cursor.last_sequence);
134
135 let events = poller
136 .poll_sync_events(storage)
137 .map_err(|e| SyncError::transport(format!("Failed to poll WAL for pull: {}", e)))?;
138
139 let mut changes = Vec::new();
141 for (sequence, record) in &events {
142 if let Some(change) =
143 build_change_from_record(&self.db, *sequence, record, self.instance_id)?
144 {
145 if let Some(ref allowed) = request.collectives {
147 if !allowed.contains(&change.collective_id) {
148 continue;
149 }
150 }
151 changes.push(change);
152 if changes.len() >= request.batch_size {
153 break;
154 }
155 }
156 }
157
158 let has_more = events.len() > changes.len();
159 let new_seq = changes
160 .last()
161 .map(|c| c.sequence)
162 .unwrap_or(request.cursor.last_sequence);
163
164 Ok(PullResponse {
165 changes,
166 has_more,
167 new_cursor: SyncCursor {
168 instance_id: self.instance_id,
169 last_sequence: new_seq,
170 },
171 })
172 }
173
174 pub fn handle_health(&self) -> Result<(), SyncError> {
176 let _seq = self
178 .db
179 .get_current_sequence()
180 .map_err(|e| SyncError::transport(format!("Health check failed: {}", e)))?;
181 Ok(())
182 }
183
184 pub fn handle_handshake_bytes(&self, body: &[u8]) -> Result<Vec<u8>, SyncError> {
188 let request: HandshakeRequest = bincode::deserialize(body).map_err(SyncError::from)?;
189 let response = self.handle_handshake(request)?;
190 bincode::serialize(&response).map_err(|e| SyncError::serialization(e.to_string()))
191 }
192
193 pub fn handle_push_bytes(&self, body: &[u8]) -> Result<Vec<u8>, SyncError> {
195 let changes: Vec<SyncChange> = bincode::deserialize(body).map_err(SyncError::from)?;
196 let response = self.handle_push(changes)?;
197 bincode::serialize(&response).map_err(|e| SyncError::serialization(e.to_string()))
198 }
199
200 pub fn handle_pull_bytes(&self, body: &[u8]) -> Result<Vec<u8>, SyncError> {
202 let request: PullRequest = bincode::deserialize(body).map_err(SyncError::from)?;
203 let response = self.handle_pull(request)?;
204 bincode::serialize(&response).map_err(|e| SyncError::serialization(e.to_string()))
205 }
206}
207
208fn build_change_from_record(
210 db: &PulseDB,
211 sequence: u64,
212 record: &crate::storage::schema::WatchEventRecord,
213 source_instance: InstanceId,
214) -> Result<Option<SyncChange>, SyncError> {
215 use super::types::{SerializableExperienceUpdate, SyncEntityType, SyncPayload};
216 use crate::storage::schema::{EntityTypeTag, WatchEventTypeTag};
217 use crate::types::{CollectiveId, ExperienceId, InsightId, RelationId, Timestamp};
218
219 let collective_id = CollectiveId::from_bytes(record.collective_id);
220 let timestamp = Timestamp::from_millis(record.timestamp_ms);
221 let map_err = |e: crate::error::PulseDBError| {
222 SyncError::transport(format!("Failed to load entity: {}", e))
223 };
224
225 let entity_type = match record.entity_type {
226 EntityTypeTag::Experience => SyncEntityType::Experience,
227 EntityTypeTag::Relation => SyncEntityType::Relation,
228 EntityTypeTag::Insight => SyncEntityType::Insight,
229 EntityTypeTag::Collective => SyncEntityType::Collective,
230 };
231
232 let payload = match (record.entity_type, record.event_type) {
233 (EntityTypeTag::Experience, WatchEventTypeTag::Created) => {
234 let id = ExperienceId::from_bytes(record.entity_id);
235 db.get_experience(id)
236 .map_err(map_err)?
237 .map(SyncPayload::ExperienceCreated)
238 }
239 (EntityTypeTag::Experience, WatchEventTypeTag::Updated) => {
240 let id = ExperienceId::from_bytes(record.entity_id);
241 db.get_experience(id)
242 .map_err(map_err)?
243 .map(|exp| SyncPayload::ExperienceUpdated {
244 id,
245 update: SerializableExperienceUpdate {
246 importance: Some(exp.importance),
247 confidence: Some(exp.confidence),
248 domain: Some(exp.domain.clone()),
249 related_files: Some(exp.related_files.clone()),
250 archived: Some(exp.archived),
251 },
252 timestamp,
253 })
254 }
255 (EntityTypeTag::Experience, WatchEventTypeTag::Archived) => {
256 let id = ExperienceId::from_bytes(record.entity_id);
257 Some(SyncPayload::ExperienceArchived { id, timestamp })
258 }
259 (EntityTypeTag::Experience, WatchEventTypeTag::Deleted) => {
260 let id = ExperienceId::from_bytes(record.entity_id);
261 Some(SyncPayload::ExperienceDeleted { id, timestamp })
262 }
263 (EntityTypeTag::Relation, WatchEventTypeTag::Created) => {
264 let id = RelationId::from_bytes(record.entity_id);
265 db.get_relation(id)
266 .map_err(map_err)?
267 .map(SyncPayload::RelationCreated)
268 }
269 (EntityTypeTag::Relation, WatchEventTypeTag::Deleted) => {
270 let id = RelationId::from_bytes(record.entity_id);
271 Some(SyncPayload::RelationDeleted { id, timestamp })
272 }
273 (EntityTypeTag::Insight, WatchEventTypeTag::Created) => {
274 let id = InsightId::from_bytes(record.entity_id);
275 db.get_insight(id)
276 .map_err(map_err)?
277 .map(SyncPayload::InsightCreated)
278 }
279 (EntityTypeTag::Insight, WatchEventTypeTag::Deleted) => {
280 let id = InsightId::from_bytes(record.entity_id);
281 Some(SyncPayload::InsightDeleted { id, timestamp })
282 }
283 (EntityTypeTag::Collective, WatchEventTypeTag::Created) => {
284 let id = CollectiveId::from_bytes(record.entity_id);
285 db.get_collective(id)
286 .map_err(map_err)?
287 .map(SyncPayload::CollectiveCreated)
288 }
289 _ => None,
290 };
291
292 Ok(payload.map(|p| SyncChange {
293 sequence,
294 source_instance,
295 collective_id,
296 entity_type,
297 payload: p,
298 timestamp,
299 }))
300}
301
302#[cfg(test)]
304mod tests {
305 use super::*;
306
307 #[test]
308 fn test_sync_server_is_send_sync() {
309 fn assert_send_sync<T: Send + Sync>() {}
310 assert_send_sync::<SyncServer>();
311 }
312}