cinchcli-core 0.1.3

Shared client-side primitives for Cinch (cinchcli.com): generated wire DTOs, REST/WebSocket clients, AES-256-GCM + X25519 crypto, credential storage, local SQLite store, and sync helpers.
Documentation
//! Long-lived WS-driven sync writer.
//!
//! `Writer::start` acquires the advisory lockfile so only one writer runs per
//! machine, performs an initial REST backfill, then subscribes to the relay
//! WebSocket.  Incoming `new_clip` events are decrypted (when an encryption key
//! is available) and inserted into the local store.  On disconnect the loop
//! receives `WsStatus::Disconnected` and `ws::run` reconnects automatically
//! with exponential backoff.
//!
//! `Writer::shutdown` signals the background task to stop and awaits it.

use std::path::PathBuf;
use std::sync::Arc;

use tokio::sync::{mpsc, Notify};
use tokio::task::JoinHandle;
use tracing::warn;

use super::lockfile::{LockKind, Lockfile};
use super::{map, reader};
use crate::http::RestClient;
use crate::store::{queries, Store};
use crate::ws::{self, DecryptFailReason, WsConfig, WsEvent, WsStatus};

/// Long-lived WS-driven sync writer.  One per machine, coordinated by the
/// `~/.cinch/sync.lock` advisory lockfile.
pub struct Writer {
    handle: Option<JoinHandle<()>>,
    stop: Arc<Notify>,
    _lock: Lockfile, // released on Drop
}

impl Writer {
    /// Try to start a writer.  Returns `Ok(None)` if another writer already
    /// holds the lock.
    ///
    /// The caller must supply a `WsConfig` (relay URL + bearer token +
    /// optional 32-byte AES key).  The same `RestClient` is used for the
    /// initial REST backfill.
    pub async fn start(
        store: Arc<Store>,
        client: Arc<RestClient>,
        ws_cfg: WsConfig,
        lock_path: PathBuf,
        kind: LockKind,
    ) -> std::io::Result<Option<Self>> {
        let lock = match Lockfile::try_acquire(&lock_path, kind)? {
            Some(l) => l,
            None => return Ok(None),
        };

        let stop = Arc::new(Notify::new());
        let stop_clone = stop.clone();
        let store_clone = store.clone();
        let client_clone = client.clone();
        let enc_key = ws_cfg.encryption_key;

        let handle = tokio::spawn(async move {
            // Initial REST backfill — bring the store up to date before
            // the WebSocket stream takes over.
            let _ = reader::backfill_once(
                &store_clone,
                &client_clone,
                reader::BackfillBudget::default(),
                enc_key.as_ref(),
            )
            .await;

            // Channel for WsEvent from the background ws::run task.
            let (tx, mut rx) = mpsc::channel::<WsEvent>(64);

            // ws::run already reconnects internally with exponential backoff,
            // so we just run it once and read from the channel until stop.
            let _ws_handle = tokio::spawn(ws::run(ws_cfg, tx));

            loop {
                tokio::select! {
                    _ = stop_clone.notified() => return,
                    maybe = rx.recv() => {
                        match maybe {
                            None => {
                                // Channel closed — ws::run exited (should not
                                // happen while the sender is alive).
                                return;
                            }
                            Some(event) => handle_ws_event(&store_clone, &client_clone, event, enc_key).await,
                        }
                    }
                }
            }
        });

        Ok(Some(Self {
            handle: Some(handle),
            stop,
            _lock: lock,
        }))
    }

    /// Signal the writer to stop and wait for it to finish.
    pub async fn shutdown(mut self) {
        self.stop.notify_waiters();
        if let Some(h) = self.handle.take() {
            let _ = h.await;
        }
    }
}

/// Process a single [`WsEvent`] from the relay subscription.
///
/// `NewClip` results in a store write. `KeyExchangeRequested` triggers
/// the ECDH bearer responder (when this device holds the encryption
/// key). Status changes are logged at debug level; decrypt failures
/// emit a warning. All other event kinds are no-ops pending Phase 5
/// handling.
async fn handle_ws_event(
    store: &Store,
    client: &RestClient,
    event: WsEvent,
    enc_key: Option<[u8; 32]>,
) {
    match event {
        WsEvent::NewClip { clip, plaintext: _ } => {
            // `ws::run` has already attempted decryption using the key supplied
            // in `WsConfig`.  After a successful decrypt `clip.encrypted` is
            // `false` and `clip.content` holds the decoded text (or base64 for
            // binary clips).  The `enc_key` was passed to `WsConfig` upstream so
            // decryption happens before the event reaches us here.
            let _ = enc_key; // consumed upstream by ws::decode_message via WsConfig
            let clip = *clip; // unbox
                              // After a successful decrypt, ws::run sets clip.encrypted = false
                              // and clip.content to the decoded string. If the clip is still
                              // marked encrypted here, the decrypt failed and we should not store
                              // ciphertext — the event type would be ClipDecryptFailed, not
                              // NewClip. So any NewClip with encrypted=false is safe to store.
            if clip.encrypted {
                // Should not happen: ws::run converts decrypt-failed clips to
                // WsEvent::ClipDecryptFailed. Guard here anyway.
                warn!(
                    clip_id = %clip.clip_id,
                    "writer: received NewClip with encrypted=true — skipping"
                );
                return;
            }
            // Use plaintext bytes for the content so binary clips (which have
            // base64 in clip.content after re-encoding) are stored as raw bytes.
            // The map function accepts clip.content as a String — for text clips
            // plaintext == clip.content.as_bytes(), so the lossy conversion is
            // a no-op.  For binary clips the store already accepts the base64
            // form (same as the REST path).
            match map::clip_wire_to_stored(&clip) {
                Ok(Some(stored)) => {
                    if let Err(e) = queries::insert_clip(store, &stored) {
                        warn!(clip_id = %stored.id, error = %e, "writer: insert_clip failed");
                    } else if let Err(e) = queries::set_watermark(store, &stored.id) {
                        warn!(clip_id = %stored.id, error = %e, "writer: set_watermark failed");
                    }
                }
                Ok(None) => {
                    // Empty clip_id — silently skip.
                }
                Err(e) => {
                    warn!(clip_id = %clip.clip_id, error = %e, "writer: map_wire_to_stored failed");
                }
            }
        }

        WsEvent::ClipDecryptFailed { clip_id, reason } => {
            let reason_str = match reason {
                DecryptFailReason::MissingKey => "no encryption key available".into(),
                DecryptFailReason::TagFailed(e) => format!("key mismatch: {e}"),
            };
            warn!(clip_id = %clip_id, reason = %reason_str, "writer: skipping encrypted clip — decrypt failed");
        }

        WsEvent::ClipDeleted { clip_id: _ } => {
            // TODO(phase 5): propagate deletion to local store.
        }

        WsEvent::Revoked { reason } => {
            warn!(
                reason = ?reason,
                "writer: device revoked by relay — writer will stop receiving events"
            );
        }

        WsEvent::TokenRotated {
            token: _,
            device_id: _,
        } => {
            // TODO(phase 5): persist the rotated token via credstore.
        }

        WsEvent::KeyExchangeRequested { device_id } => {
            log::info!(
                "writer: received KeyExchangeRequested device_id={:?}",
                device_id
            );
            let Some(did) = device_id else {
                log::info!("writer: key_exchange_requested missing device_id — skipping");
                return;
            };
            let Some(key) = enc_key else {
                log::info!(
                    "writer: cannot bear key — no encryption key on this device (target_device_id={})",
                    did
                );
                return;
            };
            log::info!("writer: bearing key for target_device_id={}", did);
            if let Err(e) = crate::key_exchange::handle_event(client, &did, &key).await {
                log::warn!(
                    "writer: key_exchange responder failed: target_device_id={}, error={}",
                    did,
                    e
                );
            } else {
                log::info!(
                    "writer: posted encrypted key bundle to peer target_device_id={}",
                    did
                );
            }
        }

        WsEvent::Status(WsStatus::Connected) => {
            tracing::debug!("writer: WS connected");
        }
        WsEvent::Status(WsStatus::Disconnected) => {
            tracing::debug!("writer: WS disconnected — ws::run will reconnect");
        }
        WsEvent::Status(WsStatus::Connecting) => {
            tracing::debug!("writer: WS connecting");
        }
    }
}