client_core/sync/writer.rs
1//! Long-lived WS-driven sync writer.
2//!
3//! `Writer::start` acquires the advisory lockfile so only one writer runs per
4//! machine, performs an initial REST backfill, then subscribes to the relay
5//! WebSocket. Incoming `new_clip` events are decrypted (when an encryption key
6//! is available) and inserted into the local store. On disconnect the loop
7//! receives `WsStatus::Disconnected` and `ws::run` reconnects automatically
8//! with exponential backoff.
9//!
10//! `Writer::shutdown` signals the background task to stop and awaits it.
11
12use std::path::PathBuf;
13use std::sync::Arc;
14
15use tokio::sync::{mpsc, Notify};
16use tokio::task::JoinHandle;
17use tracing::warn;
18
19use super::lockfile::{LockKind, Lockfile};
20use super::{map, reader};
21use crate::http::RestClient;
22use crate::store::{queries, Store};
23use crate::ws::{self, DecryptFailReason, WsConfig, WsEvent, WsStatus};
24
25/// Long-lived WS-driven sync writer. One per machine, coordinated by the
26/// `~/.cinch/sync.lock` advisory lockfile.
27pub struct Writer {
28 handle: Option<JoinHandle<()>>,
29 stop: Arc<Notify>,
30 _lock: Lockfile, // released on Drop
31}
32
33impl Writer {
34 /// Try to start a writer. Returns `Ok(None)` if another writer already
35 /// holds the lock.
36 ///
37 /// The caller must supply a `WsConfig` (relay URL + bearer token +
38 /// optional 32-byte AES key). The same `RestClient` is used for the
39 /// initial REST backfill.
40 pub async fn start(
41 store: Arc<Store>,
42 client: Arc<RestClient>,
43 ws_cfg: WsConfig,
44 lock_path: PathBuf,
45 kind: LockKind,
46 ) -> std::io::Result<Option<Self>> {
47 let lock = match Lockfile::try_acquire(&lock_path, kind)? {
48 Some(l) => l,
49 None => return Ok(None),
50 };
51
52 let stop = Arc::new(Notify::new());
53 let stop_clone = stop.clone();
54 let store_clone = store.clone();
55 let client_clone = client.clone();
56 let enc_key = ws_cfg.encryption_key;
57
58 let handle = tokio::spawn(async move {
59 // Initial REST backfill — bring the store up to date before
60 // the WebSocket stream takes over.
61 let _ = reader::backfill_once(
62 &store_clone,
63 &client_clone,
64 reader::BackfillBudget::default(),
65 enc_key.as_ref(),
66 )
67 .await;
68
69 // Channel for WsEvent from the background ws::run task.
70 let (tx, mut rx) = mpsc::channel::<WsEvent>(64);
71
72 // ws::run already reconnects internally with exponential backoff,
73 // so we just run it once and read from the channel until stop.
74 let _ws_handle = tokio::spawn(ws::run(ws_cfg, tx));
75
76 loop {
77 tokio::select! {
78 _ = stop_clone.notified() => return,
79 maybe = rx.recv() => {
80 match maybe {
81 None => {
82 // Channel closed — ws::run exited (should not
83 // happen while the sender is alive).
84 return;
85 }
86 Some(event) => handle_ws_event(&store_clone, event, enc_key).await,
87 }
88 }
89 }
90 }
91 });
92
93 Ok(Some(Self {
94 handle: Some(handle),
95 stop,
96 _lock: lock,
97 }))
98 }
99
100 /// Signal the writer to stop and wait for it to finish.
101 pub async fn shutdown(mut self) {
102 self.stop.notify_waiters();
103 if let Some(h) = self.handle.take() {
104 let _ = h.await;
105 }
106 }
107}
108
109/// Process a single [`WsEvent`] from the relay subscription.
110///
111/// Only `NewClip` results in a store write. Status changes are logged at
112/// debug level; decrypt failures emit a warning. All other event kinds are
113/// no-ops pending Phase 5 handling.
114async fn handle_ws_event(store: &Store, event: WsEvent, enc_key: Option<[u8; 32]>) {
115 match event {
116 WsEvent::NewClip { clip, plaintext: _ } => {
117 // `ws::run` has already attempted decryption using the key supplied
118 // in `WsConfig`. After a successful decrypt `clip.encrypted` is
119 // `false` and `clip.content` holds the decoded text (or base64 for
120 // binary clips). The `enc_key` was passed to `WsConfig` upstream so
121 // decryption happens before the event reaches us here.
122 let _ = enc_key; // consumed upstream by ws::decode_message via WsConfig
123 let clip = *clip; // unbox
124 // After a successful decrypt, ws::run sets clip.encrypted = false
125 // and clip.content to the decoded string. If the clip is still
126 // marked encrypted here, the decrypt failed and we should not store
127 // ciphertext — the event type would be ClipDecryptFailed, not
128 // NewClip. So any NewClip with encrypted=false is safe to store.
129 if clip.encrypted {
130 // Should not happen: ws::run converts decrypt-failed clips to
131 // WsEvent::ClipDecryptFailed. Guard here anyway.
132 warn!(
133 clip_id = %clip.clip_id,
134 "writer: received NewClip with encrypted=true — skipping"
135 );
136 return;
137 }
138 // Use plaintext bytes for the content so binary clips (which have
139 // base64 in clip.content after re-encoding) are stored as raw bytes.
140 // The map function accepts clip.content as a String — for text clips
141 // plaintext == clip.content.as_bytes(), so the lossy conversion is
142 // a no-op. For binary clips the store already accepts the base64
143 // form (same as the REST path).
144 match map::clip_wire_to_stored(&clip) {
145 Ok(Some(stored)) => {
146 if let Err(e) = queries::insert_clip(store, &stored) {
147 warn!(clip_id = %stored.id, error = %e, "writer: insert_clip failed");
148 } else if let Err(e) = queries::set_watermark(store, &stored.id) {
149 warn!(clip_id = %stored.id, error = %e, "writer: set_watermark failed");
150 }
151 }
152 Ok(None) => {
153 // Empty clip_id — silently skip.
154 }
155 Err(e) => {
156 warn!(clip_id = %clip.clip_id, error = %e, "writer: map_wire_to_stored failed");
157 }
158 }
159 }
160
161 WsEvent::ClipDecryptFailed { clip_id, reason } => {
162 let reason_str = match reason {
163 DecryptFailReason::MissingKey => "no encryption key available".into(),
164 DecryptFailReason::TagFailed(e) => format!("key mismatch: {e}"),
165 };
166 warn!(clip_id = %clip_id, reason = %reason_str, "writer: skipping encrypted clip — decrypt failed");
167 }
168
169 WsEvent::ClipDeleted { clip_id: _ } => {
170 // TODO(phase 5): propagate deletion to local store.
171 }
172
173 WsEvent::Revoked { reason } => {
174 warn!(
175 reason = ?reason,
176 "writer: device revoked by relay — writer will stop receiving events"
177 );
178 }
179
180 WsEvent::TokenRotated {
181 token: _,
182 device_id: _,
183 } => {
184 // TODO(phase 5): persist the rotated token via credstore.
185 }
186
187 WsEvent::KeyExchangeRequested { device_id: _ } => {
188 // Key-exchange responses are the desktop's responsibility; CLI
189 // watchers and the sync writer can safely ignore this event.
190 }
191
192 WsEvent::Status(WsStatus::Connected) => {
193 tracing::debug!("writer: WS connected");
194 }
195 WsEvent::Status(WsStatus::Disconnected) => {
196 tracing::debug!("writer: WS disconnected — ws::run will reconnect");
197 }
198 WsEvent::Status(WsStatus::Connecting) => {
199 tracing::debug!("writer: WS connecting");
200 }
201 }
202}