1use crate::prelude::*;
17use axum::extract::ws::{Message, WebSocket};
18use futures::sink::SinkExt;
19use futures::stream::SplitSink;
20use futures::stream::StreamExt;
21use std::collections::HashMap;
22use std::sync::atomic::{AtomicBool, Ordering};
23use std::sync::Arc;
24use std::time::Instant;
25use tokio::sync::Mutex;
26use yrs::sync::{Message as YMessage, SyncMessage};
27use yrs::updates::decoder::Decode;
28use yrs::updates::encoder::Encode;
29use yrs::{Doc, Map, ReadTxn, StateVector, Transact, Update};
30
31const TRACKING_THROTTLE_SECS: u64 = 60;
33
34#[allow(clippy::cast_precision_loss)]
38fn usize_to_f64(v: usize) -> f64 {
39 v as f64
40}
41
42struct CrdtConnection {
44 conn_id: String, user_id: String,
46 doc_id: String,
47 tn_id: TnId,
48 awareness_tx: Arc<tokio::sync::broadcast::Sender<(String, Vec<u8>)>>,
50 sync_tx: Arc<tokio::sync::broadcast::Sender<(String, Vec<u8>)>>,
52 doc: Arc<Mutex<Doc>>,
54 last_access_update: Mutex<Option<Instant>>,
56 last_modify_update: Mutex<Option<Instant>>,
57 has_modified: AtomicBool,
58}
59
60#[derive(Clone)]
62struct DocState {
63 awareness_tx: Arc<tokio::sync::broadcast::Sender<(String, Vec<u8>)>>,
64 sync_tx: Arc<tokio::sync::broadcast::Sender<(String, Vec<u8>)>>,
65 doc: Arc<Mutex<Doc>>,
66}
67
68type CrdtDocRegistry = tokio::sync::RwLock<HashMap<String, DocState>>;
70
71static CRDT_DOCS: std::sync::LazyLock<CrdtDocRegistry> =
73 std::sync::LazyLock::new(|| tokio::sync::RwLock::new(HashMap::new()));
74
75pub async fn handle_crdt_connection(
86 ws: WebSocket,
87 user_id: String,
88 doc_id: String,
89 app: App,
90 tn_id: TnId,
91 read_only: bool,
92) {
93 let conn_id =
95 cloudillo_types::utils::random_id().unwrap_or_else(|_| format!("conn-{}", now_timestamp()));
96 info!("CRDT connection: {} / {} (tn_id={}, conn_id={})", user_id, doc_id, tn_id.0, conn_id);
97
98 let doc_state = {
104 let docs = CRDT_DOCS.read().await;
105 docs.get(&doc_id).cloned()
106 };
107 let doc_state = if let Some(state) = doc_state {
108 state
109 } else {
110 let mut docs = CRDT_DOCS.write().await;
111 if let Some(state) = docs.get(&doc_id) {
113 state.clone()
114 } else {
115 let live_doc = match load_or_init_doc(&app, tn_id, &doc_id).await {
116 Ok(doc) => doc,
117 Err(e) => {
118 warn!("Failed to load doc {}, closing connection: {}", doc_id, e);
119 return;
120 }
121 };
122 let (awareness_tx, _) = tokio::sync::broadcast::channel(256);
123 let (sync_tx, _) = tokio::sync::broadcast::channel(256);
124 let state = DocState {
125 awareness_tx: Arc::new(awareness_tx),
126 sync_tx: Arc::new(sync_tx),
127 doc: Arc::new(Mutex::new(live_doc)),
128 };
129 docs.insert(doc_id.clone(), state.clone());
130 state
131 }
132 };
133
134 let conn = Arc::new(CrdtConnection {
135 conn_id: conn_id.clone(),
136 user_id: user_id.clone(),
137 doc_id: doc_id.clone(),
138 tn_id,
139 awareness_tx: doc_state.awareness_tx,
140 sync_tx: doc_state.sync_tx,
141 doc: doc_state.doc,
142 last_access_update: Mutex::new(None),
143 last_modify_update: Mutex::new(None),
144 has_modified: AtomicBool::new(false),
145 });
146
147 record_file_access_throttled(&app, &conn).await;
149
150 let (ws_tx, ws_rx) = ws.split();
152 let ws_tx: Arc<tokio::sync::Mutex<_>> = Arc::new(tokio::sync::Mutex::new(ws_tx));
153
154 {
156 let doc_guard = conn.doc.lock().await;
157 let sv = doc_guard.transact().state_vector();
158 drop(doc_guard);
159 let y_msg = YMessage::Sync(SyncMessage::SyncStep1(sv));
160 let encoded = y_msg.encode_v1();
161 info!("Sent SyncStep1 to {} for doc {} ({} bytes)", user_id, doc_id, encoded.len());
162 let mut tx = ws_tx.lock().await;
163 if let Err(e) = tx.send(Message::Binary(encoded.into())).await {
164 warn!("Failed to send SyncStep1 to {}: {}", user_id, e);
165 }
166 }
167
168 let heartbeat_task = spawn_heartbeat_task(user_id.clone(), ws_tx.clone());
170
171 let ws_recv_task =
173 spawn_receive_task(conn.clone(), ws_tx.clone(), ws_rx, app.clone(), tn_id, read_only);
174
175 let sync_task =
177 spawn_broadcast_task(conn.clone(), ws_tx.clone(), conn.sync_tx.subscribe(), "SYNC");
178
179 let awareness_task = spawn_broadcast_task(
181 conn.clone(),
182 ws_tx.clone(),
183 conn.awareness_tx.subscribe(),
184 "AWARENESS",
185 );
186
187 let _ = ws_recv_task.await;
190 debug!("WebSocket receive task ended");
191
192 record_final_activity(&app, &conn).await;
194
195 info!("CRDT connection closing for {}, aborting tasks...", user_id);
197 heartbeat_task.abort();
198 sync_task.abort();
199 awareness_task.abort();
200
201 let _ = tokio::join!(heartbeat_task, sync_task, awareness_task);
204 info!("CRDT connection closed: {} (all tasks cleaned up)", user_id);
205
206 log_doc_statistics(&app, tn_id, &conn.doc_id).await;
208
209 if is_last_connection(&conn.doc_id).await {
214 info!("Last connection closed for doc {}, waiting before optimization...", conn.doc_id);
215
216 tokio::time::sleep(std::time::Duration::from_secs(2)).await;
221
222 let removed = {
226 let mut docs = CRDT_DOCS.write().await;
227 if let Some(state) = docs.get(&conn.doc_id) {
228 if state.awareness_tx.receiver_count() == 0 && state.sync_tx.receiver_count() == 0 {
229 docs.remove(&conn.doc_id)
230 } else {
231 None
232 }
233 } else {
234 None
235 }
236 };
237
238 if let Some(doc_state) = removed {
239 info!(
240 "Confirmed no active connections for doc {}, proceeding with optimization",
241 conn.doc_id
242 );
243 optimize_document(&app, tn_id, &conn.doc_id, &doc_state.doc).await;
244 } else {
245 info!(
246 "New connection established for doc {} during grace period, skipping optimization",
247 conn.doc_id
248 );
249 }
250 }
251}
252
253async fn load_or_init_doc(app: &App, tn_id: TnId, doc_id: &str) -> ClResult<Doc> {
258 let updates = app.crdt_adapter.get_updates(tn_id, doc_id).await?;
259
260 if updates.is_empty() {
261 info!("Document {} not initialized, creating initial structure", doc_id);
262 let doc = Doc::new();
263 let meta = doc.get_or_insert_map("meta");
264 {
265 let mut txn = doc.transact_mut();
266 meta.insert(&mut txn, "i", true);
267 }
268
269 let initial_data = doc.transact().encode_state_as_update_v1(&StateVector::default());
271 if !initial_data.is_empty() {
272 let update = cloudillo_types::crdt_adapter::CrdtUpdate::with_client(
273 initial_data,
274 "system".to_string(),
275 );
276 if let Err(e) = app.crdt_adapter.store_update(tn_id, doc_id, update).await {
277 warn!("Failed to store initial CRDT update for doc {}: {}", doc_id, e);
278 } else {
279 info!("Document {} initialized", doc_id);
280 }
281 }
282 Ok(doc)
283 } else {
284 let total_bytes: usize = updates.iter().map(|u| u.data.len()).sum();
285 info!("Loading {} CRDT updates for doc {} ({} bytes)", updates.len(), doc_id, total_bytes);
286 let updates_data: Vec<Vec<u8>> = updates.iter().map(|u| u.data.clone()).collect();
287 let doc_id_owned = doc_id.to_string();
288 match app
289 .worker
290 .run_immed(move || {
291 let doc = Doc::new();
292 {
293 let mut txn = doc.transact_mut();
294 for (idx, data) in updates_data.iter().enumerate() {
295 match Update::decode_v1(data) {
296 Ok(update) => {
297 if let Err(e) = txn.apply_update(update) {
298 warn!(
299 "Update #{} for doc {} failed to apply: {}",
300 idx, doc_id_owned, e
301 );
302 }
303 }
304 Err(e) => {
305 warn!(
306 "Update #{} for doc {} failed to decode: {}",
307 idx, doc_id_owned, e
308 );
309 }
310 }
311 }
312 }
313 doc
314 })
315 .await
316 {
317 Ok(doc) => Ok(doc),
318 Err(e) => {
319 warn!("Worker pool failed loading doc {}: {}", doc_id, e);
320 Err(Error::Internal(format!("Worker pool failed loading doc {}", doc_id)))
321 }
322 }
323 }
324}
325
326fn spawn_heartbeat_task(
328 user_id: String,
329 ws_tx: Arc<tokio::sync::Mutex<SplitSink<WebSocket, Message>>>,
330) -> tokio::task::JoinHandle<()> {
331 tokio::spawn(async move {
332 let mut interval = tokio::time::interval(std::time::Duration::from_secs(15));
333 loop {
334 interval.tick().await;
335 debug!("CRDT heartbeat: {}", user_id);
336
337 let mut tx = ws_tx.lock().await;
338 if tx.send(Message::Ping(vec![].into())).await.is_err() {
339 debug!("Client disconnected during heartbeat");
340 return;
341 }
342 }
343 })
344}
345
346fn spawn_receive_task(
348 conn: Arc<CrdtConnection>,
349 ws_tx: Arc<tokio::sync::Mutex<SplitSink<WebSocket, Message>>>,
350 ws_rx: futures::stream::SplitStream<WebSocket>,
351 app: App,
352 tn_id: TnId,
353 read_only: bool,
354) -> tokio::task::JoinHandle<()> {
355 tokio::spawn(async move {
356 let mut ws_rx = ws_rx;
357 while let Some(msg) = ws_rx.next().await {
358 match msg {
359 Ok(Message::Binary(data)) => {
360 handle_yrs_message(&conn, &data, &ws_tx, &app, tn_id, read_only).await;
362 }
363 Ok(Message::Close(_) | Message::Ping(_) | Message::Pong(_)) => {
364 }
366 Ok(_) => {
367 warn!("Received non-binary WebSocket message");
368 }
369 Err(e) => {
370 warn!("CRDT connection error: {}", e);
371 break;
372 }
373 }
374 }
375 })
376}
377
378fn spawn_broadcast_task(
381 conn: Arc<CrdtConnection>,
382 ws_tx: Arc<tokio::sync::Mutex<SplitSink<WebSocket, Message>>>,
383 mut rx: tokio::sync::broadcast::Receiver<(String, Vec<u8>)>,
384 label: &'static str,
385) -> tokio::task::JoinHandle<()> {
386 tokio::spawn(async move {
387 debug!(
388 "Connection {} (user {}) subscribed to {} broadcasts for doc {}",
389 conn.conn_id, conn.user_id, label, conn.doc_id
390 );
391
392 loop {
393 match rx.recv().await {
394 Ok((sender_conn_id, data)) => {
395 debug!(
396 "{} broadcast received by conn {}: from conn {} for doc {} ({} bytes)",
397 label,
398 conn.conn_id,
399 sender_conn_id,
400 conn.doc_id,
401 data.len()
402 );
403
404 if sender_conn_id == conn.conn_id {
406 debug!("Skipping {} echo to self for conn {}", label, conn.conn_id);
407 continue;
408 }
409
410 let ws_msg = Message::Binary(data.into());
412
413 debug!(
414 "Forwarding {} update from conn {} to conn {} (user {}) for doc {}",
415 label, sender_conn_id, conn.conn_id, conn.user_id, conn.doc_id
416 );
417
418 let mut tx = ws_tx.lock().await;
419 if tx.send(ws_msg).await.is_err() {
420 debug!("Client disconnected while forwarding {} update", label);
421 return;
422 }
423 debug!("{} update successfully forwarded to conn {}", label, conn.conn_id);
424 }
425 Err(tokio::sync::broadcast::error::RecvError::Lagged(_)) => {
426 if label == "SYNC" {
427 warn!(
428 "Client {} lagged behind on {} updates for doc {}",
429 conn.user_id, label, conn.doc_id
430 );
431 } else {
432 debug!("Connection {} lagged on {} updates", conn.conn_id, label);
433 }
434 }
435 Err(tokio::sync::broadcast::error::RecvError::Closed) => {
436 debug!("{} broadcast channel closed", label);
437 return;
438 }
439 }
440 }
441 })
442}
443
444fn broadcast_message(
446 tx: &Arc<tokio::sync::broadcast::Sender<(String, Vec<u8>)>>,
447 conn_id: &str,
448 user_id: &str,
449 doc_id: &str,
450 payload: Vec<u8>,
451 label: &str,
452) {
453 match tx.send((conn_id.to_string(), payload)) {
454 Ok(receiver_count) => {
455 if label != "AWARENESS" {
456 info!(
457 "CRDT {} broadcast from conn {} (user {}) for doc {}: {} receivers",
458 label, conn_id, user_id, doc_id, receiver_count
459 );
460 }
461 }
462 Err(_) => {
463 debug!("CRDT {} broadcast failed - no receivers for doc {}", label, doc_id);
464 }
465 }
466}
467
468async fn send_echo_raw(
470 ws_tx: &Arc<tokio::sync::Mutex<SplitSink<WebSocket, Message>>>,
471 conn_id: &str,
472 user_id: &str,
473 doc_id: &str,
474 payload: &[u8],
475 label: &str,
476) {
477 let ws_msg = Message::Binary(payload.to_vec().into());
478 let mut tx = ws_tx.lock().await;
479
480 match tx.send(ws_msg).await {
481 Ok(()) => {
482 debug!(
483 "CRDT {} echo sent back to conn {} (user {}) for doc {} ({} bytes)",
484 label,
485 conn_id,
486 user_id,
487 doc_id,
488 payload.len()
489 );
490 }
491 Err(e) => {
492 warn!("Failed to send CRDT {} echo to conn {}: {}", label, conn_id, e);
493 }
494 }
495}
496
497async fn apply_and_store(
504 app: &App,
505 tn_id: TnId,
506 conn: &Arc<CrdtConnection>,
507 update_data: &[u8],
508 read_only: bool,
509 msg_type: &str,
510) -> bool {
511 if read_only {
512 debug!(
513 "Ignoring {} from read-only conn {} for doc {}",
514 msg_type, conn.conn_id, conn.doc_id
515 );
516 return false;
517 }
518 if update_data.is_empty() {
519 debug!("Received empty {} from conn {}", msg_type, conn.conn_id);
520 return false;
521 }
522
523 let is_noop = {
527 let doc_guard = conn.doc.lock().await;
528 let snapshot_before = doc_guard.transact().snapshot();
529 match Update::decode_v1(update_data) {
530 Ok(decoded) => {
531 if let Err(e) = doc_guard.transact_mut().apply_update(decoded) {
532 warn!("Failed to apply {} to live doc {}: {}", msg_type, conn.doc_id, e);
533 return false;
534 }
535 }
536 Err(e) => {
537 warn!(
538 "Rejecting malformed {} from conn {} - decode failed: {}",
539 msg_type, conn.conn_id, e
540 );
541 return false;
542 }
543 }
544 let snapshot_after = doc_guard.transact().snapshot();
545 snapshot_before == snapshot_after
546 };
547
548 if is_noop {
549 debug!(
550 "{} is a no-op for doc {} ({} bytes) - skipping persist",
551 msg_type,
552 conn.doc_id,
553 update_data.len()
554 );
555 return false;
556 }
557
558 let update = cloudillo_types::crdt_adapter::CrdtUpdate::with_client(
562 update_data.to_vec(),
563 conn.user_id.clone(),
564 );
565 if let Err(e) = app.crdt_adapter.store_update(tn_id, &conn.doc_id, update).await {
566 warn!(
567 "{} FAILED to store for doc {}: {} - live doc is ahead of DB",
568 msg_type, conn.doc_id, e
569 );
570 return false;
571 }
572
573 info!(
574 "{} stored for doc {} from user {} ({} bytes)",
575 msg_type,
576 conn.doc_id,
577 conn.user_id,
578 update_data.len()
579 );
580 record_file_modification_throttled(app, conn).await;
581 true
582}
583
584async fn handle_yrs_message(
588 conn: &Arc<CrdtConnection>,
589 data: &[u8],
590 ws_tx: &Arc<tokio::sync::Mutex<SplitSink<WebSocket, Message>>>,
591 app: &App,
592 tn_id: TnId,
593 read_only: bool,
594) {
595 if data.is_empty() {
596 warn!("Empty message from conn {}", conn.conn_id);
597 return;
598 }
599
600 match YMessage::decode_v1(data) {
602 Ok(YMessage::Sync(sync_msg)) => {
603 debug!(
604 "CRDT SYNC message from conn {} (user {}) for doc {}: {:?}",
605 conn.conn_id,
606 conn.user_id,
607 conn.doc_id,
608 match &sync_msg {
609 SyncMessage::SyncStep1(_) => "SyncStep1",
610 SyncMessage::SyncStep2(_) => "SyncStep2",
611 SyncMessage::Update(_) => "Update",
612 }
613 );
614
615 match &sync_msg {
620 SyncMessage::SyncStep1(client_sv) => {
621 info!(
624 "Received SyncStep1 from conn {} (user {}) for doc {} ({} bytes)",
625 conn.conn_id,
626 conn.user_id,
627 conn.doc_id,
628 data.len()
629 );
630 let doc_guard = conn.doc.lock().await;
631 let server_sv = doc_guard.transact().state_vector();
632 debug!(
633 "SV comparison for doc {}: server={} clients, client={} clients",
634 conn.doc_id,
635 server_sv.len(),
636 client_sv.len()
637 );
638 let diff = doc_guard.transact().encode_state_as_update_v1(client_sv);
639 drop(doc_guard);
640
641 let mut tx = ws_tx.lock().await;
642 let msg = YMessage::Sync(SyncMessage::SyncStep2(diff.clone()));
643 if let Err(e) = tx.send(Message::Binary(msg.encode_v1().into())).await {
644 warn!("Failed to send SyncStep2 to {}: {}", conn.user_id, e);
645 } else {
646 info!(
647 "Sent SyncStep2 to conn {} for doc {} ({} bytes)",
648 conn.conn_id,
649 conn.doc_id,
650 diff.len()
651 );
652 }
653 return;
654 }
655 SyncMessage::SyncStep2(update_data) => {
656 info!(
661 "Received SyncStep2 from conn {} (user {}) for doc {} ({} bytes)",
662 conn.conn_id,
663 conn.user_id,
664 conn.doc_id,
665 update_data.len()
666 );
667 if !apply_and_store(app, tn_id, conn, update_data, read_only, "SyncStep2").await
668 {
669 return;
670 }
671 }
672 SyncMessage::Update(update_data) => {
673 if !apply_and_store(app, tn_id, conn, update_data, read_only, "Update").await {
674 return;
675 }
676 }
677 }
678
679 let broadcast_data = match &sync_msg {
683 SyncMessage::SyncStep2(update_data) => {
684 let msg = YMessage::Sync(SyncMessage::Update(update_data.clone()));
685 msg.encode_v1()
686 }
687 _ => data.to_vec(),
688 };
689
690 broadcast_message(
691 &conn.sync_tx,
692 &conn.conn_id,
693 &conn.user_id,
694 &conn.doc_id,
695 broadcast_data.clone(),
696 "SYNC",
697 );
698
699 send_echo_raw(
703 ws_tx,
704 &conn.conn_id,
705 &conn.user_id,
706 &conn.doc_id,
707 &broadcast_data,
708 "SYNC",
709 )
710 .await;
711 }
712 Ok(YMessage::Awareness(_awareness_update)) => {
713 debug!(
714 "CRDT AWARENESS from conn {} (user {}) for doc {} ({} bytes)",
715 conn.conn_id,
716 conn.user_id,
717 conn.doc_id,
718 data.len()
719 );
720
721 broadcast_message(
723 &conn.awareness_tx,
724 &conn.conn_id,
725 &conn.user_id,
726 &conn.doc_id,
727 data.to_vec(),
728 "AWARENESS",
729 );
730
731 send_echo_raw(ws_tx, &conn.conn_id, &conn.user_id, &conn.doc_id, data, "AWARENESS")
733 .await;
734 }
735 Ok(other) => {
736 debug!("Received non-sync/awareness message: {:?}", other);
737 }
738 Err(e) => {
739 warn!("Failed to decode yrs message from conn {}: {}", conn.conn_id, e);
740 }
741 }
742}
743
744async fn log_doc_statistics(app: &App, tn_id: TnId, doc_id: &str) {
746 match app.crdt_adapter.get_updates(tn_id, doc_id).await {
747 Ok(updates) => {
748 let update_count = updates.len();
749 let total_size: usize = updates.iter().map(|u| u.data.len()).sum();
750
751 let avg_size = if update_count > 0 { total_size / update_count } else { 0 };
753
754 info!(
755 "CRDT doc stats [{}]: {} updates, {} bytes total, {} bytes avg",
756 doc_id, update_count, total_size, avg_size
757 );
758 }
759 Err(e) => {
760 warn!("Failed to get statistics for doc {}: {}", doc_id, e);
761 }
762 }
763}
764
765async fn optimize_document(app: &App, tn_id: TnId, doc_id: &str, doc: &Arc<Mutex<Doc>>) {
771 let updates = match app.crdt_adapter.get_updates(tn_id, doc_id).await {
773 Ok(u) => u,
774 Err(e) => {
775 warn!("Failed to get updates for optimization of doc {}: {}", doc_id, e);
776 return;
777 }
778 };
779
780 if updates.len() <= 1 {
782 debug!("Skipping optimization for doc {} (only {} updates)", doc_id, updates.len());
783 return;
784 }
785
786 let updates_before = updates.len();
787
788 let all_seqs: Vec<u64> = updates.iter().filter_map(|u| u.seq).collect();
790 if all_seqs.len() != updates.len() {
791 warn!(
792 "Doc {} has {} updates but only {} have valid seq numbers (possible key corruption)",
793 doc_id,
794 updates.len(),
795 all_seqs.len()
796 );
797 }
798 let size_before: usize = updates.iter().map(|u| u.data.len()).sum();
799
800 if all_seqs.len() <= 1 {
801 debug!(
802 "Skipping optimization for doc {} (only {} updates with seq)",
803 doc_id,
804 all_seqs.len()
805 );
806 return;
807 }
808
809 let doc_guard = doc.lock().await;
811 let merged_update = doc_guard.transact().encode_state_as_update_v1(&StateVector::default());
812 drop(doc_guard);
813
814 if merged_update.is_empty() {
815 warn!("Merged update for doc {} is empty! Aborting optimization.", doc_id);
816 return;
817 }
818
819 let size_after = merged_update.len();
820
821 if size_after >= size_before {
823 info!(
824 "Skipping optimization for doc {} (no size reduction: {} -> {})",
825 doc_id, size_before, size_after
826 );
827 return;
828 }
829
830 let merged_crdt_update =
832 cloudillo_types::crdt_adapter::CrdtUpdate::with_client(merged_update, "system".to_string());
833
834 if let Err(e) = app
835 .crdt_adapter
836 .compact_updates(tn_id, doc_id, &all_seqs, merged_crdt_update)
837 .await
838 {
839 warn!("Failed to compact updates for doc {}: {}", doc_id, e);
840 return;
841 }
842
843 let size_reduction = size_before - size_after;
844 let reduction_percent = (usize_to_f64(size_reduction) / usize_to_f64(size_before)) * 100.0;
845
846 info!(
847 "CRDT doc optimized [{}]: {} -> 1 updates, {} -> {} bytes ({:.1}% reduction)",
848 doc_id, updates_before, size_before, size_after, reduction_percent
849 );
850}
851
852async fn is_last_connection(doc_id: &str) -> bool {
859 let docs = CRDT_DOCS.read().await;
860 if let Some(state) = docs.get(doc_id) {
861 let awareness_count = state.awareness_tx.receiver_count();
862 let sync_count = state.sync_tx.receiver_count();
863
864 info!(
865 "Checking CRDT registry for doc {}: {} awareness receivers, {} sync receivers",
866 doc_id, awareness_count, sync_count
867 );
868
869 awareness_count == 0 && sync_count == 0
870 } else {
871 info!("Doc {} not found in registry during cleanup check", doc_id);
872 false
873 }
874}
875
876fn now_timestamp() -> u64 {
878 std::time::SystemTime::now()
879 .duration_since(std::time::UNIX_EPOCH)
880 .unwrap_or_default()
881 .as_secs()
882}
883
884async fn record_file_access_throttled(app: &App, conn: &CrdtConnection) {
886 let should_update = {
887 let mut last_update = conn.last_access_update.lock().await;
888 let now = Instant::now();
889 let should = match *last_update {
890 Some(last) => now.duration_since(last).as_secs() >= TRACKING_THROTTLE_SECS,
891 None => true,
892 };
893 if should {
894 *last_update = Some(now);
895 }
896 should
897 };
898
899 if should_update {
900 if let Err(e) = app
901 .meta_adapter
902 .record_file_access(conn.tn_id, &conn.user_id, &conn.doc_id)
903 .await
904 {
905 debug!("Failed to record file access for doc {}: {}", conn.doc_id, e);
906 }
907 }
908}
909
910async fn record_file_modification_throttled(app: &App, conn: &CrdtConnection) {
912 conn.has_modified.store(true, Ordering::Relaxed);
914
915 let should_update = {
916 let mut last_update = conn.last_modify_update.lock().await;
917 let now = Instant::now();
918 let should = match *last_update {
919 Some(last) => now.duration_since(last).as_secs() >= TRACKING_THROTTLE_SECS,
920 None => true,
921 };
922 if should {
923 *last_update = Some(now);
924 }
925 should
926 };
927
928 if should_update {
929 if let Err(e) = app
930 .meta_adapter
931 .record_file_modification(conn.tn_id, &conn.user_id, &conn.doc_id)
932 .await
933 {
934 debug!("Failed to record file modification for doc {}: {}", conn.doc_id, e);
935 }
936 }
937}
938
939async fn record_final_activity(app: &App, conn: &CrdtConnection) {
941 if let Err(e) = app
943 .meta_adapter
944 .record_file_access(conn.tn_id, &conn.user_id, &conn.doc_id)
945 .await
946 {
947 debug!("Failed to record final file access for doc {}: {}", conn.doc_id, e);
948 }
949
950 if conn.has_modified.load(Ordering::Relaxed) {
952 if let Err(e) = app
953 .meta_adapter
954 .record_file_modification(conn.tn_id, &conn.user_id, &conn.doc_id)
955 .await
956 {
957 debug!("Failed to record final file modification for doc {}: {}", conn.doc_id, e);
958 }
959 }
960}
961
962