1use aes_gcm::{
4 aead::{Aead, KeyInit},
5 Aes256Gcm, Key, Nonce,
6};
7use argon2::{Argon2, Params};
8use rand::RngCore;
9use std::collections::VecDeque;
10use thiserror::Error;
11
12#[derive(Debug, Clone, PartialEq, Eq, Hash)]
13pub enum SyncItem {
14 Agent(String),
15 MemoryNamespace(String),
16}
17
18#[derive(Debug, Clone, PartialEq)]
19pub enum ConflictStrategy {
20 LastWriterWins,
21 Manual,
22 Merge,
23}
24
25#[derive(Debug, Clone)]
26pub enum PeerStatus {
27 Connected,
28 Disconnected,
29}
30
31#[derive(Debug, Clone)]
32pub struct PeerDevice {
33 pub device_id: String,
34 pub last_sync: Option<String>,
35 pub status: PeerStatus,
36 pub pending_changes: usize,
37}
38
39#[derive(Debug, Default)]
40pub struct SyncResult {
41 pub synced_items: usize,
42 pub conflicts_resolved: usize,
43}
44
45#[derive(Debug, Error)]
46pub enum SyncError {
47 #[error("sync not enabled — call enable() first")]
48 NotEnabled,
49 #[error("key derivation failed: {0}")]
50 KeyDerivation(String),
51 #[error("encryption failed")]
52 Encryption,
53 #[error("decryption failed")]
54 Decryption,
55 #[error("ciphertext too short")]
56 CiphertextTooShort,
57}
58
59#[derive(Debug)]
60pub struct ConflictEntry {
61 pub key: String,
62 pub discarded_value: Vec<u8>,
63 pub winner_ts: u64,
64}
65
66pub struct SyncEngine {
67 key: Option<[u8; 32]>,
68 pub selected: Vec<SyncItem>,
69 conflict_strategy: ConflictStrategy,
70 queue: VecDeque<(String, Vec<u8>)>,
71 conflict_log: Vec<ConflictEntry>,
72}
73
74impl SyncEngine {
75 pub fn new() -> Self {
76 Self {
77 key: None,
78 selected: Vec::new(),
79 conflict_strategy: ConflictStrategy::LastWriterWins,
80 queue: VecDeque::new(),
81 conflict_log: Vec::new(),
82 }
83 }
84
85 pub fn enable(&mut self, shared_secret: &str) -> Result<(), SyncError> {
86 let salt = b"openhawk-sync-v1";
87 let params = Params::new(65536, 3, 1, Some(32))
88 .map_err(|e| SyncError::KeyDerivation(e.to_string()))?;
89 let argon2 = Argon2::new(argon2::Algorithm::Argon2id, argon2::Version::V0x13, params);
90 let mut key = [0u8; 32];
91 argon2
92 .hash_password_into(shared_secret.as_bytes(), salt, &mut key)
93 .map_err(|e| SyncError::KeyDerivation(e.to_string()))?;
94 self.key = Some(key);
95 Ok(())
96 }
97
98 pub fn discover_peers(&self) -> Vec<PeerDevice> {
100 Vec::new()
101 }
102
103 pub fn select_for_sync(&mut self, item: SyncItem) {
104 if !self.selected.contains(&item) {
105 self.selected.push(item);
106 }
107 }
108
109 pub fn is_selected(&self, item: &SyncItem) -> bool {
110 self.selected.contains(item)
111 }
112
113 pub fn sync(&self) -> Result<SyncResult, SyncError> {
114 if self.key.is_none() {
115 return Err(SyncError::NotEnabled);
116 }
117 Ok(SyncResult { synced_items: self.selected.len(), conflicts_resolved: 0 })
118 }
119
120 pub fn set_conflict_strategy(&mut self, strategy: ConflictStrategy) {
121 self.conflict_strategy = strategy;
122 }
123
124 pub fn get_conflict_strategy(&self) -> &ConflictStrategy {
125 &self.conflict_strategy
126 }
127
128 pub fn resolve_conflict(
129 &mut self,
130 key: &str,
131 local_value: &[u8],
132 local_ts: u64,
133 remote_value: &[u8],
134 remote_ts: u64,
135 ) -> Vec<u8> {
136 match self.conflict_strategy {
137 ConflictStrategy::LastWriterWins => {
138 if remote_ts > local_ts {
139 self.conflict_log.push(ConflictEntry {
140 key: key.to_string(),
141 discarded_value: local_value.to_vec(),
142 winner_ts: remote_ts,
143 });
144 remote_value.to_vec()
145 } else {
146 self.conflict_log.push(ConflictEntry {
147 key: key.to_string(),
148 discarded_value: remote_value.to_vec(),
149 winner_ts: local_ts,
150 });
151 local_value.to_vec()
152 }
153 }
154 ConflictStrategy::Manual => local_value.to_vec(),
155 ConflictStrategy::Merge => {
156 let mut merged = local_value.to_vec();
157 merged.extend_from_slice(remote_value);
158 merged
159 }
160 }
161 }
162
163 pub fn queue_change(&mut self, key: &str, value: &[u8]) {
164 self.queue.push_back((key.to_string(), value.to_vec()));
165 }
166
167 pub fn get_queued_count(&self) -> usize {
168 self.queue.len()
169 }
170
171 pub fn flush_queue(&mut self) -> Vec<(String, Vec<u8>)> {
172 self.queue.drain(..).collect()
173 }
174
175 pub fn conflict_log(&self) -> &[ConflictEntry] {
176 &self.conflict_log
177 }
178
179 pub fn encrypt(&self, data: &[u8]) -> Result<Vec<u8>, SyncError> {
180 let key_bytes = self.key.ok_or(SyncError::NotEnabled)?;
181 let cipher = Aes256Gcm::new(Key::<Aes256Gcm>::from_slice(&key_bytes));
182 let mut nonce_bytes = [0u8; 12];
183 rand::thread_rng().fill_bytes(&mut nonce_bytes);
184 let nonce = Nonce::from_slice(&nonce_bytes);
185 let ciphertext = cipher.encrypt(nonce, data).map_err(|_| SyncError::Encryption)?;
186 let mut out = Vec::with_capacity(12 + ciphertext.len());
187 out.extend_from_slice(&nonce_bytes);
188 out.extend_from_slice(&ciphertext);
189 Ok(out)
190 }
191
192 pub fn decrypt(&self, data: &[u8]) -> Result<Vec<u8>, SyncError> {
193 if data.len() < 12 {
194 return Err(SyncError::CiphertextTooShort);
195 }
196 let key_bytes = self.key.ok_or(SyncError::NotEnabled)?;
197 let cipher = Aes256Gcm::new(Key::<Aes256Gcm>::from_slice(&key_bytes));
198 let nonce = Nonce::from_slice(&data[..12]);
199 cipher.decrypt(nonce, &data[12..]).map_err(|_| SyncError::Decryption)
200 }
201}
202
203impl Default for SyncEngine {
204 fn default() -> Self {
205 Self::new()
206 }
207}
208
209#[cfg(test)]
210mod tests {
211 use super::*;
212
213 fn enabled_engine() -> SyncEngine {
214 let mut e = SyncEngine::new();
215 e.enable("test-shared-secret").unwrap();
216 e
217 }
218
219 #[test]
220 fn encrypt_decrypt_roundtrip() {
221 let engine = enabled_engine();
222 let plaintext = b"hello, sync world";
223 let ciphertext = engine.encrypt(plaintext).unwrap();
224 let recovered = engine.decrypt(&ciphertext).unwrap();
225 assert_eq!(recovered, plaintext);
226 }
227
228 #[test]
229 fn encrypt_produces_different_ciphertext_each_call() {
230 let engine = enabled_engine();
231 let data = b"same data";
232 let c1 = engine.encrypt(data).unwrap();
233 let c2 = engine.encrypt(data).unwrap();
234 assert_ne!(c1, c2);
235 }
236
237 #[test]
238 fn decrypt_fails_without_enable() {
239 let engine = SyncEngine::new();
240 let data = vec![0u8; 20];
242 assert!(matches!(engine.decrypt(&data), Err(SyncError::NotEnabled)));
243 }
244
245 #[test]
246 fn decrypt_fails_on_short_input() {
247 let engine = enabled_engine();
248 assert!(matches!(engine.decrypt(b"short"), Err(SyncError::CiphertextTooShort)));
249 }
250
251 #[test]
252 fn decrypt_fails_on_tampered_ciphertext() {
253 let engine = enabled_engine();
254 let mut ct = engine.encrypt(b"data").unwrap();
255 let last = ct.len() - 1;
256 ct[last] ^= 0xff;
257 assert!(matches!(engine.decrypt(&ct), Err(SyncError::Decryption)));
258 }
259
260 #[test]
261 fn discover_peers_returns_empty_stub() {
262 let engine = SyncEngine::new();
263 assert!(engine.discover_peers().is_empty());
264 }
265
266 #[test]
267 fn select_for_sync_marks_item() {
268 let mut engine = SyncEngine::new();
269 let item = SyncItem::Agent("my-agent".to_string());
270 assert!(!engine.is_selected(&item));
271 engine.select_for_sync(item.clone());
272 assert!(engine.is_selected(&item));
273 }
274
275 #[test]
276 fn select_for_sync_deduplicates() {
277 let mut engine = SyncEngine::new();
278 let item = SyncItem::MemoryNamespace("ns1".to_string());
279 engine.select_for_sync(item.clone());
280 engine.select_for_sync(item.clone());
281 assert_eq!(engine.selected.len(), 1);
282 }
283
284 #[test]
285 fn unselected_item_not_synced() {
286 let mut engine = enabled_engine();
287 engine.select_for_sync(SyncItem::Agent("agent-a".to_string()));
288 let result = engine.sync().unwrap();
289 assert_eq!(result.synced_items, 1);
290 assert!(!engine.is_selected(&SyncItem::Agent("agent-b".to_string())));
291 }
292
293 #[test]
294 fn sync_requires_enable() {
295 let engine = SyncEngine::new();
296 assert!(matches!(engine.sync(), Err(SyncError::NotEnabled)));
297 }
298
299 #[test]
300 fn last_writer_wins_remote_newer() {
301 let mut engine = enabled_engine();
302 let winner = engine.resolve_conflict("key", b"local", 100, b"remote", 200);
303 assert_eq!(winner, b"remote");
304 assert_eq!(engine.conflict_log().len(), 1);
305 assert_eq!(engine.conflict_log()[0].discarded_value, b"local");
306 }
307
308 #[test]
309 fn last_writer_wins_local_newer() {
310 let mut engine = enabled_engine();
311 let winner = engine.resolve_conflict("key", b"local", 300, b"remote", 200);
312 assert_eq!(winner, b"local");
313 assert_eq!(engine.conflict_log()[0].discarded_value, b"remote");
314 }
315
316 #[test]
317 fn manual_strategy_keeps_local() {
318 let mut engine = enabled_engine();
319 engine.set_conflict_strategy(ConflictStrategy::Manual);
320 let winner = engine.resolve_conflict("key", b"local", 100, b"remote", 999);
321 assert_eq!(winner, b"local");
322 }
323
324 #[test]
325 fn merge_strategy_concatenates() {
326 let mut engine = enabled_engine();
327 engine.set_conflict_strategy(ConflictStrategy::Merge);
328 let winner = engine.resolve_conflict("key", b"abc", 1, b"def", 2);
329 assert_eq!(winner, b"abcdef");
330 }
331
332 #[test]
333 fn queue_change_increments_count() {
334 let mut engine = SyncEngine::new();
335 assert_eq!(engine.get_queued_count(), 0);
336 engine.queue_change("k1", b"v1");
337 engine.queue_change("k2", b"v2");
338 assert_eq!(engine.get_queued_count(), 2);
339 }
340
341 #[test]
342 fn flush_queue_drains_all() {
343 let mut engine = SyncEngine::new();
344 engine.queue_change("k1", b"v1");
345 engine.queue_change("k2", b"v2");
346 let flushed = engine.flush_queue();
347 assert_eq!(flushed.len(), 2);
348 assert_eq!(engine.get_queued_count(), 0);
349 }
350
351 #[test]
352 fn flush_queue_preserves_order() {
353 let mut engine = SyncEngine::new();
354 engine.queue_change("first", b"1");
355 engine.queue_change("second", b"2");
356 let flushed = engine.flush_queue();
357 assert_eq!(flushed[0].0, "first");
358 assert_eq!(flushed[1].0, "second");
359 }
360
361 #[test]
362 fn conflict_strategy_default_is_lww() {
363 let engine = SyncEngine::new();
364 assert_eq!(*engine.get_conflict_strategy(), ConflictStrategy::LastWriterWins);
365 }
366
367 #[test]
368 fn set_conflict_strategy_persists() {
369 let mut engine = SyncEngine::new();
370 engine.set_conflict_strategy(ConflictStrategy::Manual);
371 assert_eq!(*engine.get_conflict_strategy(), ConflictStrategy::Manual);
372 }
373
374 #[test]
375 fn enable_with_different_secrets_produces_different_keys() {
376 let mut e1 = SyncEngine::new();
377 let mut e2 = SyncEngine::new();
378 e1.enable("secret-one").unwrap();
379 e2.enable("secret-two").unwrap();
380 let ct = e1.encrypt(b"data").unwrap();
381 assert!(e2.decrypt(&ct).is_err());
382 }
383}