Skip to main content

client_core/sync/
writer.rs

1//! Long-lived WS-driven sync writer.
2//!
3//! `Writer::start` acquires the advisory lockfile so only one writer runs per
4//! machine, performs an initial REST backfill, then subscribes to the relay
5//! WebSocket.  Incoming `new_clip` events are decrypted (when an encryption key
6//! is available) and inserted into the local store.  On disconnect the loop
7//! receives `WsStatus::Disconnected` and `ws::run` reconnects automatically
8//! with exponential backoff.
9//!
10//! `Writer::shutdown` signals the background task to stop and awaits it.
11
12use std::path::PathBuf;
13use std::sync::Arc;
14
15use tokio::sync::{mpsc, Notify};
16use tokio::task::JoinHandle;
17use tracing::warn;
18
19use super::lockfile::{LockKind, Lockfile};
20use super::{map, reader};
21use crate::http::RestClient;
22use crate::protocol::Clip;
23use crate::store::{queries, Store};
24use crate::ws::{self, DecryptFailReason, WsConfig, WsEvent, WsStatus};
25
26/// Callback invoked after a remote `new_clip` event has been successfully
27/// decrypted and inserted into the local store. The closure receives the
28/// already-decrypted wire `Clip` so callers can extract metadata (id, source,
29/// content_type) without touching the store.
30///
31/// Wrapped in `Arc` so the same callback can be shared across the writer's
32/// background task and any future re-spawns. `Send + Sync` lets the closure
33/// cross task boundaries.
34pub type OnNewClipCallback = Arc<dyn Fn(&Clip) + Send + Sync>;
35
36/// Long-lived WS-driven sync writer.  One per machine, coordinated by the
37/// `~/.cinch/sync.lock` advisory lockfile.
38pub struct Writer {
39    handle: Option<JoinHandle<()>>,
40    stop: Arc<Notify>,
41    _lock: Lockfile, // released on Drop
42}
43
44impl Writer {
45    /// Try to start a writer.  Returns `Ok(None)` if another writer already
46    /// holds the lock.
47    ///
48    /// The caller must supply a `WsConfig` (relay URL + bearer token +
49    /// optional 32-byte AES key).  The same `RestClient` is used for the
50    /// initial REST backfill.
51    pub async fn start(
52        store: Arc<Store>,
53        client: Arc<RestClient>,
54        ws_cfg: WsConfig,
55        lock_path: PathBuf,
56        kind: LockKind,
57        on_new_clip: Option<OnNewClipCallback>,
58    ) -> std::io::Result<Option<Self>> {
59        let lock = match Lockfile::try_acquire(&lock_path, kind)? {
60            Some(l) => l,
61            None => return Ok(None),
62        };
63
64        let stop = Arc::new(Notify::new());
65        let stop_clone = stop.clone();
66        let store_clone = store.clone();
67        let client_clone = client.clone();
68        let enc_key = ws_cfg.encryption_key;
69
70        let handle = tokio::spawn(async move {
71            // Initial REST backfill — bring the store up to date before
72            // the WebSocket stream takes over.
73            let _ = reader::backfill_once(
74                &store_clone,
75                &client_clone,
76                reader::BackfillBudget::default(),
77                enc_key.as_ref(),
78            )
79            .await;
80
81            // Channel for WsEvent from the background ws::run task.
82            let (tx, mut rx) = mpsc::channel::<WsEvent>(64);
83
84            // ws::run already reconnects internally with exponential backoff,
85            // so we just run it once and read from the channel until stop.
86            let _ws_handle = tokio::spawn(ws::run(ws_cfg, tx));
87
88            loop {
89                tokio::select! {
90                    _ = stop_clone.notified() => return,
91                    maybe = rx.recv() => {
92                        match maybe {
93                            None => {
94                                // Channel closed — ws::run exited (should not
95                                // happen while the sender is alive).
96                                return;
97                            }
98                            Some(event) => handle_ws_event(&store_clone, &client_clone, event, enc_key, on_new_clip.as_ref()).await,
99                        }
100                    }
101                }
102            }
103        });
104
105        Ok(Some(Self {
106            handle: Some(handle),
107            stop,
108            _lock: lock,
109        }))
110    }
111
112    /// Signal the writer to stop and wait for it to finish.
113    pub async fn shutdown(mut self) {
114        self.stop.notify_waiters();
115        if let Some(h) = self.handle.take() {
116            let _ = h.await;
117        }
118    }
119}
120
121/// Process a single [`WsEvent`] from the relay subscription.
122///
123/// `NewClip` results in a store write. `KeyExchangeRequested` triggers
124/// the ECDH bearer responder (when this device holds the encryption
125/// key). Status changes are logged at debug level; decrypt failures
126/// emit a warning. All other event kinds are no-ops pending Phase 5
127/// handling.
128async fn handle_ws_event(
129    store: &Store,
130    client: &RestClient,
131    event: WsEvent,
132    enc_key: Option<[u8; 32]>,
133    on_new_clip: Option<&OnNewClipCallback>,
134) {
135    match event {
136        WsEvent::NewClip { clip, plaintext: _ } => {
137            // `ws::run` has already attempted decryption using the key supplied
138            // in `WsConfig`.  After a successful decrypt `clip.encrypted` is
139            // `false` and `clip.content` holds the decoded text (or base64 for
140            // binary clips).  The `enc_key` was passed to `WsConfig` upstream so
141            // decryption happens before the event reaches us here.
142            let _ = enc_key; // consumed upstream by ws::decode_message via WsConfig
143            let clip = *clip; // unbox
144                              // After a successful decrypt, ws::run sets clip.encrypted = false
145                              // and clip.content to the decoded string. If the clip is still
146                              // marked encrypted here, the decrypt failed and we should not store
147                              // ciphertext — the event type would be ClipDecryptFailed, not
148                              // NewClip. So any NewClip with encrypted=false is safe to store.
149            if clip.encrypted {
150                // Should not happen: ws::run converts decrypt-failed clips to
151                // WsEvent::ClipDecryptFailed. Guard here anyway.
152                warn!(
153                    clip_id = %clip.clip_id,
154                    "writer: received NewClip with encrypted=true — skipping"
155                );
156                return;
157            }
158            // Use plaintext bytes for the content so binary clips (which have
159            // base64 in clip.content after re-encoding) are stored as raw bytes.
160            // The map function accepts clip.content as a String — for text clips
161            // plaintext == clip.content.as_bytes(), so the lossy conversion is
162            // a no-op.  For binary clips the store already accepts the base64
163            // form (same as the REST path).
164            match map::clip_wire_to_stored(&clip) {
165                Ok(Some(stored)) => {
166                    let inserted = match queries::insert_clip(store, &stored) {
167                        Ok(()) => true,
168                        Err(e) => {
169                            warn!(clip_id = %stored.id, error = %e, "writer: insert_clip failed");
170                            false
171                        }
172                    };
173                    if inserted {
174                        if let Err(e) = queries::set_watermark(store, &stored.id) {
175                            warn!(clip_id = %stored.id, error = %e, "writer: set_watermark failed");
176                        }
177                        if let Some(cb) = on_new_clip {
178                            cb(&clip);
179                        }
180                    }
181                }
182                Ok(None) => {
183                    // Empty clip_id — silently skip.
184                }
185                Err(e) => {
186                    warn!(clip_id = %clip.clip_id, error = %e, "writer: map_wire_to_stored failed");
187                }
188            }
189        }
190
191        WsEvent::ClipDecryptFailed { clip_id, reason } => {
192            let reason_str = match reason {
193                DecryptFailReason::MissingKey => "no encryption key available".into(),
194                DecryptFailReason::TagFailed(e) => format!("key mismatch: {e}"),
195            };
196            warn!(clip_id = %clip_id, reason = %reason_str, "writer: skipping encrypted clip — decrypt failed");
197        }
198
199        WsEvent::ClipDeleted { clip_id: _ } => {
200            // TODO(phase 5): propagate deletion to local store.
201        }
202
203        WsEvent::Revoked { reason } => {
204            warn!(
205                reason = ?reason,
206                "writer: device revoked by relay — writer will stop receiving events"
207            );
208        }
209
210        WsEvent::TokenRotated {
211            token: _,
212            device_id: _,
213        } => {
214            // TODO(phase 5): persist the rotated token via credstore.
215        }
216
217        WsEvent::KeyExchangeRequested { device_id } => {
218            log::info!(
219                "writer: received KeyExchangeRequested device_id={:?}",
220                device_id
221            );
222            let Some(did) = device_id else {
223                log::info!("writer: key_exchange_requested missing device_id — skipping");
224                return;
225            };
226            let Some(key) = enc_key else {
227                log::info!(
228                    "writer: cannot bear key — no encryption key on this device (target_device_id={})",
229                    did
230                );
231                return;
232            };
233            log::info!("writer: bearing key for target_device_id={}", did);
234            if let Err(e) = crate::key_exchange::handle_event(client, &did, &key).await {
235                log::warn!(
236                    "writer: key_exchange responder failed: target_device_id={}, error={}",
237                    did,
238                    e
239                );
240            } else {
241                log::info!(
242                    "writer: posted encrypted key bundle to peer target_device_id={}",
243                    did
244                );
245            }
246        }
247
248        WsEvent::Status(WsStatus::Connected) => {
249            tracing::debug!("writer: WS connected");
250        }
251        WsEvent::Status(WsStatus::Disconnected) => {
252            tracing::debug!("writer: WS disconnected — ws::run will reconnect");
253        }
254        WsEvent::Status(WsStatus::Connecting) => {
255            tracing::debug!("writer: WS connecting");
256        }
257    }
258}