use std::path::PathBuf;
use std::sync::Arc;
use tokio::sync::{mpsc, Notify};
use tokio::task::JoinHandle;
use tracing::warn;
use super::lockfile::{LockKind, Lockfile};
use super::{map, reader};
use crate::http::RestClient;
use crate::store::{queries, Store};
use crate::ws::{self, DecryptFailReason, WsConfig, WsEvent, WsStatus};
pub struct Writer {
handle: Option<JoinHandle<()>>,
stop: Arc<Notify>,
_lock: Lockfile, }
impl Writer {
pub async fn start(
store: Arc<Store>,
client: Arc<RestClient>,
ws_cfg: WsConfig,
lock_path: PathBuf,
kind: LockKind,
) -> std::io::Result<Option<Self>> {
let lock = match Lockfile::try_acquire(&lock_path, kind)? {
Some(l) => l,
None => return Ok(None),
};
let stop = Arc::new(Notify::new());
let stop_clone = stop.clone();
let store_clone = store.clone();
let client_clone = client.clone();
let enc_key = ws_cfg.encryption_key;
let handle = tokio::spawn(async move {
let _ = reader::backfill_once(
&store_clone,
&client_clone,
reader::BackfillBudget::default(),
enc_key.as_ref(),
)
.await;
let (tx, mut rx) = mpsc::channel::<WsEvent>(64);
let _ws_handle = tokio::spawn(ws::run(ws_cfg, tx));
loop {
tokio::select! {
_ = stop_clone.notified() => return,
maybe = rx.recv() => {
match maybe {
None => {
return;
}
Some(event) => handle_ws_event(&store_clone, &client_clone, event, enc_key).await,
}
}
}
}
});
Ok(Some(Self {
handle: Some(handle),
stop,
_lock: lock,
}))
}
pub async fn shutdown(mut self) {
self.stop.notify_waiters();
if let Some(h) = self.handle.take() {
let _ = h.await;
}
}
}
async fn handle_ws_event(
store: &Store,
client: &RestClient,
event: WsEvent,
enc_key: Option<[u8; 32]>,
) {
match event {
WsEvent::NewClip { clip, plaintext: _ } => {
let _ = enc_key; let clip = *clip; if clip.encrypted {
warn!(
clip_id = %clip.clip_id,
"writer: received NewClip with encrypted=true — skipping"
);
return;
}
match map::clip_wire_to_stored(&clip) {
Ok(Some(stored)) => {
if let Err(e) = queries::insert_clip(store, &stored) {
warn!(clip_id = %stored.id, error = %e, "writer: insert_clip failed");
} else if let Err(e) = queries::set_watermark(store, &stored.id) {
warn!(clip_id = %stored.id, error = %e, "writer: set_watermark failed");
}
}
Ok(None) => {
}
Err(e) => {
warn!(clip_id = %clip.clip_id, error = %e, "writer: map_wire_to_stored failed");
}
}
}
WsEvent::ClipDecryptFailed { clip_id, reason } => {
let reason_str = match reason {
DecryptFailReason::MissingKey => "no encryption key available".into(),
DecryptFailReason::TagFailed(e) => format!("key mismatch: {e}"),
};
warn!(clip_id = %clip_id, reason = %reason_str, "writer: skipping encrypted clip — decrypt failed");
}
WsEvent::ClipDeleted { clip_id: _ } => {
}
WsEvent::Revoked { reason } => {
warn!(
reason = ?reason,
"writer: device revoked by relay — writer will stop receiving events"
);
}
WsEvent::TokenRotated {
token: _,
device_id: _,
} => {
}
WsEvent::KeyExchangeRequested { device_id } => {
let Some(did) = device_id else {
tracing::debug!("writer: key_exchange_requested missing device_id — skipping");
return;
};
let Some(key) = enc_key else {
tracing::debug!(
target_device_id = %did,
"writer: cannot bear key — no encryption key on this device"
);
return;
};
if let Err(e) = crate::key_exchange::handle_event(client, &did, &key).await {
warn!(
target_device_id = %did,
error = %e,
"writer: key_exchange responder failed"
);
} else {
tracing::debug!(
target_device_id = %did,
"writer: posted encrypted key bundle to peer"
);
}
}
WsEvent::Status(WsStatus::Connected) => {
tracing::debug!("writer: WS connected");
}
WsEvent::Status(WsStatus::Disconnected) => {
tracing::debug!("writer: WS disconnected — ws::run will reconnect");
}
WsEvent::Status(WsStatus::Connecting) => {
tracing::debug!("writer: WS connecting");
}
}
}