Skip to main content

cloudillo_crdt/
websocket.rs

1// SPDX-FileCopyrightText: Szilárd Hajba
2// SPDX-License-Identifier: LGPL-3.0-or-later
3
4//! WebSocket CRDT Handler - Collaborative Document Editing
5//!
6//! The CRDT protocol (`/ws/crdt/:doc_id`) provides real-time collaborative editing
7//! using Yjs conflict-free replicated data types.
8//!
9//! Message Format (Binary):
10//! Messages use the Yjs sync protocol format directly (lib0 encoding):
11//! - MSG_SYNC (0): Sync protocol messages (SyncStep1, SyncStep2, Update)
12//! - MSG_AWARENESS (1): User presence/cursor updates
13//!
14//! All messages are encoded/decoded using yrs::sync::Message.
15
16use crate::prelude::*;
17use axum::extract::ws::{Message, WebSocket};
18use futures::sink::SinkExt;
19use futures::stream::SplitSink;
20use futures::stream::StreamExt;
21use std::collections::HashMap;
22use std::sync::atomic::{AtomicBool, Ordering};
23use std::sync::Arc;
24use std::time::Instant;
25use tokio::sync::Mutex;
26use yrs::sync::{Message as YMessage, SyncMessage};
27use yrs::updates::decoder::Decode;
28use yrs::updates::encoder::Encode;
29use yrs::{Doc, Map, ReadTxn, StateVector, Transact, Update};
30
31/// Throttle interval for access/modification tracking (60 seconds)
32const TRACKING_THROTTLE_SECS: u64 = 60;
33
34/// Convert `usize` to `f64`, accepting minor precision loss for values above 2^53.
35///
36/// Used for byte-size percentages where exact precision is not critical.
37#[allow(clippy::cast_precision_loss)]
38fn usize_to_f64(v: usize) -> f64 {
39	v as f64
40}
41
42/// CRDT connection tracking
43struct CrdtConnection {
44	conn_id: String, // Unique connection ID (to distinguish multiple tabs from same user)
45	user_id: String,
46	doc_id: String,
47	tn_id: TnId,
48	// Broadcast channel for awareness updates (conn_id, raw_awareness_data)
49	awareness_tx: Arc<tokio::sync::broadcast::Sender<(String, Vec<u8>)>>,
50	// Broadcast channel for sync updates (conn_id, raw_sync_data)
51	sync_tx: Arc<tokio::sync::broadcast::Sender<(String, Vec<u8>)>>,
52	// Live Y.Doc kept in memory for instant state vector / diff computation
53	doc: Arc<Mutex<Doc>>,
54	// User activity tracking state (throttled)
55	last_access_update: Mutex<Option<Instant>>,
56	last_modify_update: Mutex<Option<Instant>>,
57	has_modified: AtomicBool,
58}
59
60/// Per-document state: broadcast channels + live Y.Doc
61#[derive(Clone)]
62struct DocState {
63	awareness_tx: Arc<tokio::sync::broadcast::Sender<(String, Vec<u8>)>>,
64	sync_tx: Arc<tokio::sync::broadcast::Sender<(String, Vec<u8>)>>,
65	doc: Arc<Mutex<Doc>>,
66}
67
68/// Type alias for the CRDT document registry
69type CrdtDocRegistry = tokio::sync::RwLock<HashMap<String, DocState>>;
70
71// Global registry of CRDT documents and their connections
72static CRDT_DOCS: std::sync::LazyLock<CrdtDocRegistry> =
73	std::sync::LazyLock::new(|| tokio::sync::RwLock::new(HashMap::new()));
74
75/// Handle a CRDT connection
76///
77/// The `read_only` parameter controls whether this connection can send updates.
78/// Read-only connections can receive sync messages and awareness updates,
79/// but their Update messages will be rejected.
80///
81/// SECURITY TODO: Access level is checked once at connection time but not re-validated.
82/// If a user's access is revoked (e.g., FSHR action deleted), they keep their original
83/// access level until reconnection. Consider adding periodic re-validation (every 30s
84/// or 100 messages) to enforce access revocation mid-session.
85pub async fn handle_crdt_connection(
86	ws: WebSocket,
87	user_id: String,
88	doc_id: String,
89	app: App,
90	tn_id: TnId,
91	read_only: bool,
92) {
93	// Generate unique connection ID
94	let conn_id =
95		cloudillo_types::utils::random_id().unwrap_or_else(|_| format!("conn-{}", now_timestamp()));
96	info!("CRDT connection: {} / {} (tn_id={}, conn_id={})", user_id, doc_id, tn_id.0, conn_id);
97
98	// Get or create per-document state (broadcast channels + live Y.Doc).
99	// We check with a read lock first (fast path for existing docs), then
100	// acquire a write lock to insert if missing. The write lock is held
101	// across load_or_init_doc so that only one connection ever initializes
102	// a given document (avoids duplicate initial updates for new docs).
103	let doc_state = {
104		let docs = CRDT_DOCS.read().await;
105		docs.get(&doc_id).cloned()
106	};
107	let doc_state = if let Some(state) = doc_state {
108		state
109	} else {
110		let mut docs = CRDT_DOCS.write().await;
111		// Re-check: another connection may have inserted while we waited
112		if let Some(state) = docs.get(&doc_id) {
113			state.clone()
114		} else {
115			let live_doc = match load_or_init_doc(&app, tn_id, &doc_id).await {
116				Ok(doc) => doc,
117				Err(e) => {
118					warn!("Failed to load doc {}, closing connection: {}", doc_id, e);
119					return;
120				}
121			};
122			let (awareness_tx, _) = tokio::sync::broadcast::channel(256);
123			let (sync_tx, _) = tokio::sync::broadcast::channel(256);
124			let state = DocState {
125				awareness_tx: Arc::new(awareness_tx),
126				sync_tx: Arc::new(sync_tx),
127				doc: Arc::new(Mutex::new(live_doc)),
128			};
129			docs.insert(doc_id.clone(), state.clone());
130			state
131		}
132	};
133
134	let conn = Arc::new(CrdtConnection {
135		conn_id: conn_id.clone(),
136		user_id: user_id.clone(),
137		doc_id: doc_id.clone(),
138		tn_id,
139		awareness_tx: doc_state.awareness_tx,
140		sync_tx: doc_state.sync_tx,
141		doc: doc_state.doc,
142		last_access_update: Mutex::new(None),
143		last_modify_update: Mutex::new(None),
144		has_modified: AtomicBool::new(false),
145	});
146
147	// Record initial file access (throttled)
148	record_file_access_throttled(&app, &conn).await;
149
150	// Split WebSocket for concurrent operations
151	let (ws_tx, ws_rx) = ws.split();
152	let ws_tx: Arc<tokio::sync::Mutex<_>> = Arc::new(tokio::sync::Mutex::new(ws_tx));
153
154	// Send server's SyncStep1 (state vector from live doc — instant, no DB read)
155	{
156		let doc_guard = conn.doc.lock().await;
157		let sv = doc_guard.transact().state_vector();
158		drop(doc_guard);
159		let y_msg = YMessage::Sync(SyncMessage::SyncStep1(sv));
160		let encoded = y_msg.encode_v1();
161		info!("Sent SyncStep1 to {} for doc {} ({} bytes)", user_id, doc_id, encoded.len());
162		let mut tx = ws_tx.lock().await;
163		if let Err(e) = tx.send(Message::Binary(encoded.into())).await {
164			warn!("Failed to send SyncStep1 to {}: {}", user_id, e);
165		}
166	}
167
168	// Heartbeat task - sends ping frames to keep connection alive
169	let heartbeat_task = spawn_heartbeat_task(user_id.clone(), ws_tx.clone());
170
171	// WebSocket receive task - handles incoming messages
172	let ws_recv_task =
173		spawn_receive_task(conn.clone(), ws_tx.clone(), ws_rx, app.clone(), tn_id, read_only);
174
175	// Sync broadcast task - forwards CRDT updates to other clients
176	let sync_task =
177		spawn_broadcast_task(conn.clone(), ws_tx.clone(), conn.sync_tx.subscribe(), "SYNC");
178
179	// Awareness broadcast task - forwards awareness updates to other clients
180	let awareness_task = spawn_broadcast_task(
181		conn.clone(),
182		ws_tx.clone(),
183		conn.awareness_tx.subscribe(),
184		"AWARENESS",
185	);
186
187	// Wait for WebSocket receive task to complete (client disconnected)
188	// We don't need to select on all tasks - the ws_recv_task is the one that matters
189	let _ = ws_recv_task.await;
190	debug!("WebSocket receive task ended");
191
192	// Record final file activity before closing
193	record_final_activity(&app, &conn).await;
194
195	// Abort all other tasks to ensure cleanup
196	info!("CRDT connection closing for {}, aborting tasks...", user_id);
197	heartbeat_task.abort();
198	sync_task.abort();
199	awareness_task.abort();
200
201	// Wait for aborted tasks to fully clean up (drop their receivers)
202	// We can ignore the JoinError since we just aborted them
203	let _ = tokio::join!(heartbeat_task, sync_task, awareness_task);
204	info!("CRDT connection closed: {} (all tasks cleaned up)", user_id);
205
206	// Always log document statistics on close
207	log_doc_statistics(&app, tn_id, &conn.doc_id).await;
208
209	// Check if this was the last connection (read-only check).
210	// We do NOT remove from the registry yet — a reconnecting client during the
211	// grace period must find the existing DocState (with the live Doc), not create
212	// a fresh one.
213	if is_last_connection(&conn.doc_id).await {
214		info!("Last connection closed for doc {}, waiting before optimization...", conn.doc_id);
215
216		// Wait a grace period to ensure:
217		// 1. No new connections are in the process of being established
218		// 2. All concurrent disconnections have completed
219		// 3. No pending updates are still being processed
220		tokio::time::sleep(std::time::Duration::from_secs(2)).await;
221
222		// Acquire write lock, re-check, and only then remove + extract DocState.
223		// This avoids TOCTOU: if a new connection joined during the grace period
224		// it will have receivers on the existing DocState, so we skip removal.
225		let removed = {
226			let mut docs = CRDT_DOCS.write().await;
227			if let Some(state) = docs.get(&conn.doc_id) {
228				if state.awareness_tx.receiver_count() == 0 && state.sync_tx.receiver_count() == 0 {
229					docs.remove(&conn.doc_id)
230				} else {
231					None
232				}
233			} else {
234				None
235			}
236		};
237
238		if let Some(doc_state) = removed {
239			info!(
240				"Confirmed no active connections for doc {}, proceeding with optimization",
241				conn.doc_id
242			);
243			optimize_document(&app, tn_id, &conn.doc_id, &doc_state.doc).await;
244		} else {
245			info!(
246				"New connection established for doc {} during grace period, skipping optimization",
247				conn.doc_id
248			);
249		}
250	}
251}
252
253/// Load a Y.Doc from stored updates, or initialize a new one if the document is empty.
254///
255/// Called once per document when the first connection opens. The returned Doc is kept
256/// in-memory in the `CRDT_DOCS` registry for the lifetime of the document's connections.
257async fn load_or_init_doc(app: &App, tn_id: TnId, doc_id: &str) -> ClResult<Doc> {
258	let updates = app.crdt_adapter.get_updates(tn_id, doc_id).await?;
259
260	if updates.is_empty() {
261		info!("Document {} not initialized, creating initial structure", doc_id);
262		let doc = Doc::new();
263		let meta = doc.get_or_insert_map("meta");
264		{
265			let mut txn = doc.transact_mut();
266			meta.insert(&mut txn, "i", true);
267		}
268
269		// Persist the initial update
270		let initial_data = doc.transact().encode_state_as_update_v1(&StateVector::default());
271		if !initial_data.is_empty() {
272			let update = cloudillo_types::crdt_adapter::CrdtUpdate::with_client(
273				initial_data,
274				"system".to_string(),
275			);
276			if let Err(e) = app.crdt_adapter.store_update(tn_id, doc_id, update).await {
277				warn!("Failed to store initial CRDT update for doc {}: {}", doc_id, e);
278			} else {
279				info!("Document {} initialized", doc_id);
280			}
281		}
282		Ok(doc)
283	} else {
284		let total_bytes: usize = updates.iter().map(|u| u.data.len()).sum();
285		info!("Loading {} CRDT updates for doc {} ({} bytes)", updates.len(), doc_id, total_bytes);
286		let updates_data: Vec<Vec<u8>> = updates.iter().map(|u| u.data.clone()).collect();
287		let doc_id_owned = doc_id.to_string();
288		match app
289			.worker
290			.run_immed(move || {
291				let doc = Doc::new();
292				{
293					let mut txn = doc.transact_mut();
294					for (idx, data) in updates_data.iter().enumerate() {
295						match Update::decode_v1(data) {
296							Ok(update) => {
297								if let Err(e) = txn.apply_update(update) {
298									warn!(
299										"Update #{} for doc {} failed to apply: {}",
300										idx, doc_id_owned, e
301									);
302								}
303							}
304							Err(e) => {
305								warn!(
306									"Update #{} for doc {} failed to decode: {}",
307									idx, doc_id_owned, e
308								);
309							}
310						}
311					}
312				}
313				doc
314			})
315			.await
316		{
317			Ok(doc) => Ok(doc),
318			Err(e) => {
319				warn!("Worker pool failed loading doc {}: {}", doc_id, e);
320				Err(Error::Internal(format!("Worker pool failed loading doc {}", doc_id)))
321			}
322		}
323	}
324}
325
326/// Spawn heartbeat task that sends ping frames periodically
327fn spawn_heartbeat_task(
328	user_id: String,
329	ws_tx: Arc<tokio::sync::Mutex<SplitSink<WebSocket, Message>>>,
330) -> tokio::task::JoinHandle<()> {
331	tokio::spawn(async move {
332		let mut interval = tokio::time::interval(std::time::Duration::from_secs(15));
333		loop {
334			interval.tick().await;
335			debug!("CRDT heartbeat: {}", user_id);
336
337			let mut tx = ws_tx.lock().await;
338			if tx.send(Message::Ping(vec![].into())).await.is_err() {
339				debug!("Client disconnected during heartbeat");
340				return;
341			}
342		}
343	})
344}
345
346/// Spawn WebSocket receive task that handles incoming messages
347fn spawn_receive_task(
348	conn: Arc<CrdtConnection>,
349	ws_tx: Arc<tokio::sync::Mutex<SplitSink<WebSocket, Message>>>,
350	ws_rx: futures::stream::SplitStream<WebSocket>,
351	app: App,
352	tn_id: TnId,
353	read_only: bool,
354) -> tokio::task::JoinHandle<()> {
355	tokio::spawn(async move {
356		let mut ws_rx = ws_rx;
357		while let Some(msg) = ws_rx.next().await {
358			match msg {
359				Ok(Message::Binary(data)) => {
360					// yrs messages are sent directly without our wrapper
361					handle_yrs_message(&conn, &data, &ws_tx, &app, tn_id, read_only).await;
362				}
363				Ok(Message::Close(_) | Message::Ping(_) | Message::Pong(_)) => {
364					// Ignore control frames
365				}
366				Ok(_) => {
367					warn!("Received non-binary WebSocket message");
368				}
369				Err(e) => {
370					warn!("CRDT connection error: {}", e);
371					break;
372				}
373			}
374		}
375	})
376}
377
378/// Spawn a generic broadcast task that forwards updates to other clients
379/// This handles both SYNC and AWARENESS broadcasts with the same logic
380fn spawn_broadcast_task(
381	conn: Arc<CrdtConnection>,
382	ws_tx: Arc<tokio::sync::Mutex<SplitSink<WebSocket, Message>>>,
383	mut rx: tokio::sync::broadcast::Receiver<(String, Vec<u8>)>,
384	label: &'static str,
385) -> tokio::task::JoinHandle<()> {
386	tokio::spawn(async move {
387		debug!(
388			"Connection {} (user {}) subscribed to {} broadcasts for doc {}",
389			conn.conn_id, conn.user_id, label, conn.doc_id
390		);
391
392		loop {
393			match rx.recv().await {
394				Ok((sender_conn_id, data)) => {
395					debug!(
396						"{} broadcast received by conn {}: from conn {} for doc {} ({} bytes)",
397						label,
398						conn.conn_id,
399						sender_conn_id,
400						conn.doc_id,
401						data.len()
402					);
403
404					// Skip if this is from the current connection (already echoed)
405					if sender_conn_id == conn.conn_id {
406						debug!("Skipping {} echo to self for conn {}", label, conn.conn_id);
407						continue;
408					}
409
410					// Forward update to this client (data is already yrs-encoded, send directly)
411					let ws_msg = Message::Binary(data.into());
412
413					debug!(
414						"Forwarding {} update from conn {} to conn {} (user {}) for doc {}",
415						label, sender_conn_id, conn.conn_id, conn.user_id, conn.doc_id
416					);
417
418					let mut tx = ws_tx.lock().await;
419					if tx.send(ws_msg).await.is_err() {
420						debug!("Client disconnected while forwarding {} update", label);
421						return;
422					}
423					debug!("{} update successfully forwarded to conn {}", label, conn.conn_id);
424				}
425				Err(tokio::sync::broadcast::error::RecvError::Lagged(_)) => {
426					if label == "SYNC" {
427						warn!(
428							"Client {} lagged behind on {} updates for doc {}",
429							conn.user_id, label, conn.doc_id
430						);
431					} else {
432						debug!("Connection {} lagged on {} updates", conn.conn_id, label);
433					}
434				}
435				Err(tokio::sync::broadcast::error::RecvError::Closed) => {
436					debug!("{} broadcast channel closed", label);
437					return;
438				}
439			}
440		}
441	})
442}
443
444/// Broadcast a message and log the result
445fn broadcast_message(
446	tx: &Arc<tokio::sync::broadcast::Sender<(String, Vec<u8>)>>,
447	conn_id: &str,
448	user_id: &str,
449	doc_id: &str,
450	payload: Vec<u8>,
451	label: &str,
452) {
453	match tx.send((conn_id.to_string(), payload)) {
454		Ok(receiver_count) => {
455			if label != "AWARENESS" {
456				info!(
457					"CRDT {} broadcast from conn {} (user {}) for doc {}: {} receivers",
458					label, conn_id, user_id, doc_id, receiver_count
459				);
460			}
461		}
462		Err(_) => {
463			debug!("CRDT {} broadcast failed - no receivers for doc {}", label, doc_id);
464		}
465	}
466}
467
468/// Send raw echo response back to the client (yrs-encoded data)
469async fn send_echo_raw(
470	ws_tx: &Arc<tokio::sync::Mutex<SplitSink<WebSocket, Message>>>,
471	conn_id: &str,
472	user_id: &str,
473	doc_id: &str,
474	payload: &[u8],
475	label: &str,
476) {
477	let ws_msg = Message::Binary(payload.to_vec().into());
478	let mut tx = ws_tx.lock().await;
479
480	match tx.send(ws_msg).await {
481		Ok(()) => {
482			debug!(
483				"CRDT {} echo sent back to conn {} (user {}) for doc {} ({} bytes)",
484				label,
485				conn_id,
486				user_id,
487				doc_id,
488				payload.len()
489			);
490		}
491		Err(e) => {
492			warn!("Failed to send CRDT {} echo to conn {}: {}", label, conn_id, e);
493		}
494	}
495}
496
497/// Handle a yrs-encoded message
498///
499/// Decode, apply to the live Doc, and persist an update from a client.
500///
501/// Returns `true` if the update was successfully stored (caller should broadcast),
502/// or `false` if it was rejected/skipped/failed (caller should return early).
503async fn apply_and_store(
504	app: &App,
505	tn_id: TnId,
506	conn: &Arc<CrdtConnection>,
507	update_data: &[u8],
508	read_only: bool,
509	msg_type: &str,
510) -> bool {
511	if read_only {
512		debug!(
513			"Ignoring {} from read-only conn {} for doc {}",
514			msg_type, conn.conn_id, conn.doc_id
515		);
516		return false;
517	}
518	if update_data.is_empty() {
519		debug!("Received empty {} from conn {}", msg_type, conn.conn_id);
520		return false;
521	}
522
523	// Apply to the live doc first to detect no-ops (e.g., SyncStep2 with only
524	// redundant delete-set metadata). yrs::Update is !Send so we must decode
525	// inside the lock scope.
526	let is_noop = {
527		let doc_guard = conn.doc.lock().await;
528		let snapshot_before = doc_guard.transact().snapshot();
529		match Update::decode_v1(update_data) {
530			Ok(decoded) => {
531				if let Err(e) = doc_guard.transact_mut().apply_update(decoded) {
532					warn!("Failed to apply {} to live doc {}: {}", msg_type, conn.doc_id, e);
533					return false;
534				}
535			}
536			Err(e) => {
537				warn!(
538					"Rejecting malformed {} from conn {} - decode failed: {}",
539					msg_type, conn.conn_id, e
540				);
541				return false;
542			}
543		}
544		let snapshot_after = doc_guard.transact().snapshot();
545		snapshot_before == snapshot_after
546	};
547
548	if is_noop {
549		debug!(
550			"{} is a no-op for doc {} ({} bytes) - skipping persist",
551			msg_type,
552			conn.doc_id,
553			update_data.len()
554		);
555		return false;
556	}
557
558	// Persist to DB — the live doc is already updated. On persist failure the
559	// live doc is ahead of DB, but this self-corrects: compaction on close
560	// will persist the full merged state.
561	let update = cloudillo_types::crdt_adapter::CrdtUpdate::with_client(
562		update_data.to_vec(),
563		conn.user_id.clone(),
564	);
565	if let Err(e) = app.crdt_adapter.store_update(tn_id, &conn.doc_id, update).await {
566		warn!(
567			"{} FAILED to store for doc {}: {} - live doc is ahead of DB",
568			msg_type, conn.doc_id, e
569		);
570		return false;
571	}
572
573	info!(
574		"{} stored for doc {} from user {} ({} bytes)",
575		msg_type,
576		conn.doc_id,
577		conn.user_id,
578		update_data.len()
579	);
580	record_file_modification_throttled(app, conn).await;
581	true
582}
583
584/// The `read_only` parameter controls whether Update messages are accepted.
585/// Read-only connections can still receive SyncStep1/2 for initial sync,
586/// but their Update messages (actual edits) will be rejected.
587async fn handle_yrs_message(
588	conn: &Arc<CrdtConnection>,
589	data: &[u8],
590	ws_tx: &Arc<tokio::sync::Mutex<SplitSink<WebSocket, Message>>>,
591	app: &App,
592	tn_id: TnId,
593	read_only: bool,
594) {
595	if data.is_empty() {
596		warn!("Empty message from conn {}", conn.conn_id);
597		return;
598	}
599
600	// Decode using yrs
601	match YMessage::decode_v1(data) {
602		Ok(YMessage::Sync(sync_msg)) => {
603			debug!(
604				"CRDT SYNC message from conn {} (user {}) for doc {}: {:?}",
605				conn.conn_id,
606				conn.user_id,
607				conn.doc_id,
608				match &sync_msg {
609					SyncMessage::SyncStep1(_) => "SyncStep1",
610					SyncMessage::SyncStep2(_) => "SyncStep2",
611					SyncMessage::Update(_) => "Update",
612				}
613			);
614
615			// Handle each sync message type according to the y-sync protocol.
616			// Only SyncStep2 and Update messages that are successfully stored
617			// should be broadcast+echoed. SyncStep1, read-only rejections, empty
618			// messages, and store failures must return early to avoid broadcast.
619			match &sync_msg {
620				SyncMessage::SyncStep1(client_sv) => {
621					// Client sent its state vector — respond with SyncStep2 (updates the
622					// client is missing). Computed instantly from the live in-memory Doc.
623					info!(
624						"Received SyncStep1 from conn {} (user {}) for doc {} ({} bytes)",
625						conn.conn_id,
626						conn.user_id,
627						conn.doc_id,
628						data.len()
629					);
630					let doc_guard = conn.doc.lock().await;
631					let server_sv = doc_guard.transact().state_vector();
632					debug!(
633						"SV comparison for doc {}: server={} clients, client={} clients",
634						conn.doc_id,
635						server_sv.len(),
636						client_sv.len()
637					);
638					let diff = doc_guard.transact().encode_state_as_update_v1(client_sv);
639					drop(doc_guard);
640
641					let mut tx = ws_tx.lock().await;
642					let msg = YMessage::Sync(SyncMessage::SyncStep2(diff.clone()));
643					if let Err(e) = tx.send(Message::Binary(msg.encode_v1().into())).await {
644						warn!("Failed to send SyncStep2 to {}: {}", conn.user_id, e);
645					} else {
646						info!(
647							"Sent SyncStep2 to conn {} for doc {} ({} bytes)",
648							conn.conn_id,
649							conn.doc_id,
650							diff.len()
651						);
652					}
653					return;
654				}
655				SyncMessage::SyncStep2(update_data) => {
656					// SyncStep2 from client may contain redundant data (the
657					// client's full state diff). We persist it like a normal
658					// update — yrs handles duplicates idempotently, and
659					// compaction merges everything on close.
660					info!(
661						"Received SyncStep2 from conn {} (user {}) for doc {} ({} bytes)",
662						conn.conn_id,
663						conn.user_id,
664						conn.doc_id,
665						update_data.len()
666					);
667					if !apply_and_store(app, tn_id, conn, update_data, read_only, "SyncStep2").await
668					{
669						return;
670					}
671				}
672				SyncMessage::Update(update_data) => {
673					if !apply_and_store(app, tn_id, conn, update_data, read_only, "Update").await {
674						return;
675					}
676				}
677			}
678
679			// Broadcast successfully stored updates to other clients.
680			// SyncStep2 data must be re-encoded as Update for protocol conformance:
681			// SyncStep2 is a handshake response, not a live update message.
682			let broadcast_data = match &sync_msg {
683				SyncMessage::SyncStep2(update_data) => {
684					let msg = YMessage::Sync(SyncMessage::Update(update_data.clone()));
685					msg.encode_v1()
686				}
687				_ => data.to_vec(),
688			};
689
690			broadcast_message(
691				&conn.sync_tx,
692				&conn.conn_id,
693				&conn.user_id,
694				&conn.doc_id,
695				broadcast_data.clone(),
696				"SYNC",
697			);
698
699			// Echo back to sender as keepalive (y-websocket disconnects after 30s
700			// without data messages; PING frames don't count as they bypass onmessage).
701			// The echoed data is harmless: the client already has it and processes as no-op.
702			send_echo_raw(
703				ws_tx,
704				&conn.conn_id,
705				&conn.user_id,
706				&conn.doc_id,
707				&broadcast_data,
708				"SYNC",
709			)
710			.await;
711		}
712		Ok(YMessage::Awareness(_awareness_update)) => {
713			debug!(
714				"CRDT AWARENESS from conn {} (user {}) for doc {} ({} bytes)",
715				conn.conn_id,
716				conn.user_id,
717				conn.doc_id,
718				data.len()
719			);
720
721			// Broadcast to other clients
722			broadcast_message(
723				&conn.awareness_tx,
724				&conn.conn_id,
725				&conn.user_id,
726				&conn.doc_id,
727				data.to_vec(),
728				"AWARENESS",
729			);
730
731			// Echo back to sender
732			send_echo_raw(ws_tx, &conn.conn_id, &conn.user_id, &conn.doc_id, data, "AWARENESS")
733				.await;
734		}
735		Ok(other) => {
736			debug!("Received non-sync/awareness message: {:?}", other);
737		}
738		Err(e) => {
739			warn!("Failed to decode yrs message from conn {}: {}", conn.conn_id, e);
740		}
741	}
742}
743
744/// Log document statistics (update count and total size)
745async fn log_doc_statistics(app: &App, tn_id: TnId, doc_id: &str) {
746	match app.crdt_adapter.get_updates(tn_id, doc_id).await {
747		Ok(updates) => {
748			let update_count = updates.len();
749			let total_size: usize = updates.iter().map(|u| u.data.len()).sum();
750
751			// Calculate average update size
752			let avg_size = if update_count > 0 { total_size / update_count } else { 0 };
753
754			info!(
755				"CRDT doc stats [{}]: {} updates, {} bytes total, {} bytes avg",
756				doc_id, update_count, total_size, avg_size
757			);
758		}
759		Err(e) => {
760			warn!("Failed to get statistics for doc {}: {}", doc_id, e);
761		}
762	}
763}
764
765/// Optimize document by encoding the live Doc state as a single compacted update.
766///
767/// Uses the in-memory Doc (already has all updates applied) to produce the merged
768/// state — no DB reads or doc reconstruction needed. The replacement is atomic
769/// (single database transaction) — no data loss on crash.
770async fn optimize_document(app: &App, tn_id: TnId, doc_id: &str, doc: &Arc<Mutex<Doc>>) {
771	// Get all existing updates (with seq numbers) for size comparison and seq tracking
772	let updates = match app.crdt_adapter.get_updates(tn_id, doc_id).await {
773		Ok(u) => u,
774		Err(e) => {
775			warn!("Failed to get updates for optimization of doc {}: {}", doc_id, e);
776			return;
777		}
778	};
779
780	// Skip optimization if there's only 0 or 1 update
781	if updates.len() <= 1 {
782		debug!("Skipping optimization for doc {} (only {} updates)", doc_id, updates.len());
783		return;
784	}
785
786	let updates_before = updates.len();
787
788	// Collect seqs of all updates (we'll replace them all with the merged state)
789	let all_seqs: Vec<u64> = updates.iter().filter_map(|u| u.seq).collect();
790	if all_seqs.len() != updates.len() {
791		warn!(
792			"Doc {} has {} updates but only {} have valid seq numbers (possible key corruption)",
793			doc_id,
794			updates.len(),
795			all_seqs.len()
796		);
797	}
798	let size_before: usize = updates.iter().map(|u| u.data.len()).sum();
799
800	if all_seqs.len() <= 1 {
801		debug!(
802			"Skipping optimization for doc {} (only {} updates with seq)",
803			doc_id,
804			all_seqs.len()
805		);
806		return;
807	}
808
809	// Encode merged state from the live Doc (instant — no reconstruction)
810	let doc_guard = doc.lock().await;
811	let merged_update = doc_guard.transact().encode_state_as_update_v1(&StateVector::default());
812	drop(doc_guard);
813
814	if merged_update.is_empty() {
815		warn!("Merged update for doc {} is empty! Aborting optimization.", doc_id);
816		return;
817	}
818
819	let size_after = merged_update.len();
820
821	// Only proceed if optimization actually reduces size
822	if size_after >= size_before {
823		info!(
824			"Skipping optimization for doc {} (no size reduction: {} -> {})",
825			doc_id, size_before, size_after
826		);
827		return;
828	}
829
830	// Atomically replace all updates with the compacted result
831	let merged_crdt_update =
832		cloudillo_types::crdt_adapter::CrdtUpdate::with_client(merged_update, "system".to_string());
833
834	if let Err(e) = app
835		.crdt_adapter
836		.compact_updates(tn_id, doc_id, &all_seqs, merged_crdt_update)
837		.await
838	{
839		warn!("Failed to compact updates for doc {}: {}", doc_id, e);
840		return;
841	}
842
843	let size_reduction = size_before - size_after;
844	let reduction_percent = (usize_to_f64(size_reduction) / usize_to_f64(size_before)) * 100.0;
845
846	info!(
847		"CRDT doc optimized [{}]: {} -> 1 updates, {} -> {} bytes ({:.1}% reduction)",
848		doc_id, updates_before, size_before, size_after, reduction_percent
849	);
850}
851
852/// Check if a document has no remaining active connections (read-only).
853///
854/// Returns `true` if the doc is in the registry with zero receivers on both
855/// channels, meaning optimization should be attempted after a grace period.
856/// Does **not** remove the entry — that happens later under a write lock to
857/// avoid TOCTOU races with reconnecting clients.
858async fn is_last_connection(doc_id: &str) -> bool {
859	let docs = CRDT_DOCS.read().await;
860	if let Some(state) = docs.get(doc_id) {
861		let awareness_count = state.awareness_tx.receiver_count();
862		let sync_count = state.sync_tx.receiver_count();
863
864		info!(
865			"Checking CRDT registry for doc {}: {} awareness receivers, {} sync receivers",
866			doc_id, awareness_count, sync_count
867		);
868
869		awareness_count == 0 && sync_count == 0
870	} else {
871		info!("Doc {} not found in registry during cleanup check", doc_id);
872		false
873	}
874}
875
876/// Get current timestamp
877fn now_timestamp() -> u64 {
878	std::time::SystemTime::now()
879		.duration_since(std::time::UNIX_EPOCH)
880		.unwrap_or_default()
881		.as_secs()
882}
883
884/// Record file access with throttling (max once per TRACKING_THROTTLE_SECS)
885async fn record_file_access_throttled(app: &App, conn: &CrdtConnection) {
886	let should_update = {
887		let mut last_update = conn.last_access_update.lock().await;
888		let now = Instant::now();
889		let should = match *last_update {
890			Some(last) => now.duration_since(last).as_secs() >= TRACKING_THROTTLE_SECS,
891			None => true,
892		};
893		if should {
894			*last_update = Some(now);
895		}
896		should
897	};
898
899	if should_update {
900		if let Err(e) = app
901			.meta_adapter
902			.record_file_access(conn.tn_id, &conn.user_id, &conn.doc_id)
903			.await
904		{
905			debug!("Failed to record file access for doc {}: {}", conn.doc_id, e);
906		}
907	}
908}
909
910/// Record file modification with throttling (max once per TRACKING_THROTTLE_SECS)
911async fn record_file_modification_throttled(app: &App, conn: &CrdtConnection) {
912	// Mark that this session has modifications
913	conn.has_modified.store(true, Ordering::Relaxed);
914
915	let should_update = {
916		let mut last_update = conn.last_modify_update.lock().await;
917		let now = Instant::now();
918		let should = match *last_update {
919			Some(last) => now.duration_since(last).as_secs() >= TRACKING_THROTTLE_SECS,
920			None => true,
921		};
922		if should {
923			*last_update = Some(now);
924		}
925		should
926	};
927
928	if should_update {
929		if let Err(e) = app
930			.meta_adapter
931			.record_file_modification(conn.tn_id, &conn.user_id, &conn.doc_id)
932			.await
933		{
934			debug!("Failed to record file modification for doc {}: {}", conn.doc_id, e);
935		}
936	}
937}
938
939/// Record final access and modification on connection close
940async fn record_final_activity(app: &App, conn: &CrdtConnection) {
941	// Always record final access time
942	if let Err(e) = app
943		.meta_adapter
944		.record_file_access(conn.tn_id, &conn.user_id, &conn.doc_id)
945		.await
946	{
947		debug!("Failed to record final file access for doc {}: {}", conn.doc_id, e);
948	}
949
950	// Record final modification if any changes were made
951	if conn.has_modified.load(Ordering::Relaxed) {
952		if let Err(e) = app
953			.meta_adapter
954			.record_file_modification(conn.tn_id, &conn.user_id, &conn.doc_id)
955			.await
956		{
957			debug!("Failed to record final file modification for doc {}: {}", conn.doc_id, e);
958		}
959	}
960}
961
962// vim: ts=4