1use std::{
6 collections::HashMap,
7 net::SocketAddr,
8 sync::{
9 Arc, Mutex, OnceLock,
10 atomic::{AtomicBool, AtomicU64, Ordering},
11 },
12 thread,
13 time::{Duration, Instant},
14};
15
16use dashmap::DashMap;
17use futures_util::{SinkExt, StreamExt};
18use hyphae::SelectExt;
19use myko::{
20 WS_MAX_FRAME_SIZE_BYTES, WS_MAX_MESSAGE_SIZE_BYTES,
21 command::{CommandContext, CommandHandlerRegistration},
22 entities::client::{Client, ClientId},
23 relationship::{iter_client_id_registrations, iter_fallback_to_id_registrations},
24 report::AnyOutput,
25 request::RequestContext,
26 server::{
27 CellServerCtx, ClientSession, PendingQueryResponse, WsWriter,
28 client_registry::try_client_registry,
29 },
30 wire::{
31 CancelSubscription, CommandError, CommandResponse, EncodedCommandMessage, MEvent,
32 MEventType, MykoMessage, PingData, QueryWindowUpdate, ViewError, ViewWindowUpdate,
33 },
34};
35use tokio::{net::TcpStream, sync::mpsc, time::interval};
36use tokio_tungstenite::{
37 accept_async_with_config,
38 tungstenite::{Message, protocol::WebSocketConfig},
39};
40use uuid::Uuid;
41
42const SWITCH_TO_MSGPACK: &str = "myko:switch-to-msgpack";
45
46struct WsIngestStats {
47 counts_by_type: DashMap<Arc<str>, Arc<AtomicU64>>,
48}
49
50struct WsBenchmarkStats {
51 message_count: AtomicU64,
52 total_bytes: AtomicU64,
53}
54
55static WS_INGEST_STATS: OnceLock<Arc<WsIngestStats>> = OnceLock::new();
56static WS_INGEST_LOGGER_STARTED: AtomicBool = AtomicBool::new(false);
57static WS_BENCHMARK_STATS: OnceLock<Arc<WsBenchmarkStats>> = OnceLock::new();
58static WS_BENCHMARK_LOGGER_STARTED: AtomicBool = AtomicBool::new(false);
59
60fn ws_benchmark_stats() -> Arc<WsBenchmarkStats> {
61 WS_BENCHMARK_STATS
62 .get_or_init(|| {
63 Arc::new(WsBenchmarkStats {
64 message_count: AtomicU64::new(0),
65 total_bytes: AtomicU64::new(0),
66 })
67 })
68 .clone()
69}
70
71fn ensure_ws_benchmark_logger() {
72 if WS_BENCHMARK_LOGGER_STARTED
73 .compare_exchange(false, true, Ordering::Relaxed, Ordering::Relaxed)
74 .is_err()
75 {
76 return;
77 }
78
79 let stats = ws_benchmark_stats();
80 thread::Builder::new()
81 .name("ws-benchmark-logger".into())
82 .spawn(move || {
83 loop {
84 thread::sleep(Duration::from_secs(1));
85
86 let count = stats.message_count.swap(0, Ordering::Relaxed);
87 let bytes = stats.total_bytes.swap(0, Ordering::Relaxed);
88
89 if count == 0 {
90 continue;
91 }
92
93 log::info!(
94 "WebSocket benchmark last_1s messages={} bytes={} avg_bytes={}",
95 count,
96 bytes,
97 if count > 0 { bytes / count } else { 0 },
98 );
99 }
100 })
101 .expect("failed to spawn websocket benchmark logger thread");
102}
103
104fn ws_ingest_stats() -> Arc<WsIngestStats> {
105 WS_INGEST_STATS
106 .get_or_init(|| {
107 Arc::new(WsIngestStats {
108 counts_by_type: DashMap::new(),
109 })
110 })
111 .clone()
112}
113
114fn ensure_ws_ingest_logger() {
115 if WS_INGEST_LOGGER_STARTED
116 .compare_exchange(false, true, Ordering::Relaxed, Ordering::Relaxed)
117 .is_err()
118 {
119 return;
120 }
121
122 let stats = ws_ingest_stats();
123 thread::Builder::new()
124 .name("ws-ingest-logger".into())
125 .spawn(move || {
126 loop {
127 thread::sleep(Duration::from_secs(1));
128
129 let mut per_type = Vec::new();
130 let mut total = 0u64;
131 for entry in &stats.counts_by_type {
132 let count = entry.value().swap(0, Ordering::Relaxed);
133 if count == 0 {
134 continue;
135 }
136 total += count;
137 per_type.push((entry.key().clone(), count));
138 }
139
140 if total == 0 {
141 continue;
142 }
143
144 per_type.sort_by(|a, b| b.1.cmp(&a.1).then_with(|| a.0.cmp(&b.0)));
145 let details = per_type
146 .into_iter()
147 .map(|(entity_type, count)| format!("{entity_type}={count}"))
148 .collect::<Vec<_>>()
149 .join(" ");
150
151 log::info!("WebSocket ingest last_1s total={} {}", total, details);
152 }
153 })
154 .expect("failed to spawn websocket ingest logger thread");
155}
156
157fn record_ws_ingest(events: &[MEvent]) {
158 if events.is_empty() {
159 return;
160 }
161
162 let stats = ws_ingest_stats();
163 for event in events {
164 let counter = stats
165 .counts_by_type
166 .entry(Arc::<str>::from(event.item_type.as_str()))
167 .or_insert_with(|| Arc::new(AtomicU64::new(0)))
168 .clone();
169 counter.fetch_add(1, Ordering::Relaxed);
170 }
171}
172
173fn normalize_incoming_event(event: &mut MEvent, client_id: &str) {
174 if event.change_type != MEventType::SET {
175 return;
176 }
177
178 for reg in iter_client_id_registrations() {
180 if reg.entity_type == event.item_type {
181 if let Some(obj) = event.item.as_object_mut() {
182 obj.insert(
183 reg.field_name_json.to_string(),
184 serde_json::Value::String(client_id.to_string()),
185 );
186 }
187 break;
188 }
189 }
190
191 if let Some(obj) = event.item.as_object_mut()
194 && let Some(id) = obj.get("id").and_then(|v| v.as_str()).map(String::from)
195 {
196 for reg in iter_fallback_to_id_registrations() {
197 if reg.entity_type == event.item_type {
198 let field = reg.field_name_json;
199 if matches!(obj.get(field), None | Some(serde_json::Value::Null)) {
200 obj.insert(field.to_string(), serde_json::Value::String(id.clone()));
201 }
202 }
203 }
204 }
205}
206
207struct DropLogger {
212 client_id: Arc<str>,
213 dropped: std::sync::atomic::AtomicU64,
214 last_log_ms: std::sync::atomic::AtomicU64,
215}
216
217impl DropLogger {
218 fn new(client_id: Arc<str>) -> Self {
219 Self {
220 client_id,
221 dropped: std::sync::atomic::AtomicU64::new(0),
222 last_log_ms: std::sync::atomic::AtomicU64::new(0),
223 }
224 }
225
226 fn on_drop(&self, kind: &'static str, err: &dyn std::fmt::Display) {
227 use std::sync::atomic::Ordering;
228
229 self.dropped.fetch_add(1, Ordering::Relaxed);
230
231 let now_ms = std::time::SystemTime::now()
233 .duration_since(std::time::UNIX_EPOCH)
234 .unwrap_or_default()
235 .as_millis() as u64;
236 let last_ms = self.last_log_ms.load(Ordering::Relaxed);
237 if now_ms.saturating_sub(last_ms) < 1000 {
238 return;
239 }
240
241 if self
242 .last_log_ms
243 .compare_exchange(last_ms, now_ms, Ordering::Relaxed, Ordering::Relaxed)
244 .is_err()
245 {
246 return;
247 }
248
249 let n = self.dropped.swap(0, Ordering::Relaxed);
250 log::warn!(
251 "WebSocket send buffer full; dropped {} message(s) for client {} (latest: {}): {}",
252 n,
253 self.client_id,
254 kind,
255 err
256 );
257 }
258}
259
260struct CommandJob {
261 tx_id: Arc<str>,
262 command_id: String,
263 command: serde_json::Value,
264 received_at: Instant,
265}
266
267enum SubscriptionReady {
269 Query {
270 tx_id: Arc<str>,
271 cellmap: hyphae::CellMap<Arc<str>, Arc<dyn myko::item::AnyItem>, hyphae::CellImmutable>,
272 window: Option<myko::wire::QueryWindow>,
273 },
274 View {
275 tx_id: Arc<str>,
276 view_id: Arc<str>,
277 cellmap: hyphae::CellMap<Arc<str>, Arc<dyn myko::item::AnyItem>, hyphae::CellImmutable>,
278 window: Option<myko::wire::QueryWindow>,
279 },
280}
281
282enum OutboundMessage {
283 Message(MykoMessage),
284 SerializedCommand {
285 tx: Arc<str>,
286 command_id: String,
287 payload: EncodedCommandMessage,
288 },
289}
290
291enum DeferredOutbound {
292 Report(Arc<str>, Arc<dyn AnyOutput>),
293 Query {
294 response: PendingQueryResponse,
295 is_view: bool,
296 },
297}
298
299pub struct WsHandler;
301
302impl WsHandler {
303 #[allow(clippy::too_many_arguments)]
305 pub async fn handle_connection(
306 stream: TcpStream,
307 addr: SocketAddr,
308 ctx: Arc<CellServerCtx>,
309 ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
310 ensure_ws_ingest_logger();
311
312 let host_id = ctx.host_id;
313
314 let mut ws_config = WebSocketConfig::default();
315 ws_config.max_message_size = Some(WS_MAX_MESSAGE_SIZE_BYTES);
316 ws_config.max_frame_size = Some(WS_MAX_FRAME_SIZE_BYTES);
317 let ws_stream = accept_async_with_config(stream, Some(ws_config)).await?;
318 let (mut write, mut read) = ws_stream.split();
319
320 let (tx, mut rx) = mpsc::channel::<OutboundMessage>(10_000);
323 let (deferred_tx, mut deferred_rx) = mpsc::channel::<DeferredOutbound>(10_000);
324 let (priority_tx, mut priority_rx) = mpsc::channel::<MykoMessage>(1_000);
325 let (command_tx, mut command_rx) = mpsc::unbounded_channel::<CommandJob>();
326 let (subscribe_tx, mut subscribe_rx) = mpsc::unbounded_channel::<SubscriptionReady>();
327
328 let use_binary = Arc::new(AtomicBool::new(false));
330
331 let client_id: Arc<str> = Uuid::new_v4().to_string().into();
333 let drop_logger = Arc::new(DropLogger::new(client_id.clone()));
334 let writer = ChannelWriter {
335 tx: tx.clone(),
336 deferred_tx: deferred_tx.clone(),
337 drop_logger: drop_logger.clone(),
338 use_binary_writer: use_binary.clone(),
339 };
340
341 let writer_arc: Arc<dyn WsWriter> = Arc::new(ChannelWriter {
343 tx: tx.clone(),
344 deferred_tx: deferred_tx.clone(),
345 drop_logger: drop_logger.clone(),
346 use_binary_writer: use_binary.clone(),
347 });
348 if let Some(registry) = try_client_registry() {
349 registry.register(client_id.clone(), writer_arc);
350 }
351
352 let mut session = ClientSession::new(client_id.clone(), writer);
353
354 let use_binary_writer = use_binary.clone();
355 let query_ids_by_tx: Arc<Mutex<HashMap<Arc<str>, Arc<str>>>> =
356 Arc::new(Mutex::new(HashMap::new()));
357 let view_ids_by_tx: Arc<Mutex<HashMap<Arc<str>, Arc<str>>>> =
358 Arc::new(Mutex::new(HashMap::new()));
359 let subscribe_started_by_tx: Arc<Mutex<HashMap<Arc<str>, Instant>>> =
360 Arc::new(Mutex::new(HashMap::new()));
361 let command_started_by_tx: Arc<Mutex<HashMap<Arc<str>, Instant>>> =
362 Arc::new(Mutex::new(HashMap::new()));
363 let outbound_commands_by_tx: Arc<Mutex<HashMap<String, (String, Instant)>>> =
364 Arc::new(Mutex::new(HashMap::new()));
365
366 let outbound_commands_by_tx_writer = outbound_commands_by_tx.clone();
367
368 let client_entity = Client {
370 id: ClientId(client_id.clone()),
371 server_id: host_id.to_string().into(),
372 windback: None,
373 };
374 if let Err(e) = ctx.set(&client_entity) {
375 log::error!("Failed to persist client entity: {e}");
376 }
377
378 log::info!("Client connected: {} from {}", client_id, addr);
379
380 let write_ctx = ctx.clone();
381 let write_client_id = client_id.clone();
382 let write_addr = addr;
383 let command_ctx = ctx.clone();
384 let command_priority_tx = priority_tx.clone();
385 let command_drop_logger = drop_logger.clone();
386 let command_client_id = client_id.clone();
387
388 let write_task = tokio::spawn(async move {
390 let _ctx = write_ctx;
391 let mut normal_open = true;
392 let mut priority_open = true;
393 let mut deferred_open = true;
394 while normal_open || priority_open || deferred_open {
395 let msg = tokio::select! {
396 biased;
397 maybe = priority_rx.recv(), if priority_open => {
398 match maybe {
399 Some(msg) => OutboundMessage::Message(msg),
400 None => {
401 priority_open = false;
402 continue;
403 }
404 }
405 }
406 maybe = deferred_rx.recv(), if deferred_open => {
407 match maybe {
408 Some(DeferredOutbound::Report(tx, output)) => {
409 OutboundMessage::Message(MykoMessage::ReportResponse(myko::wire::ReportResponse {
410 response: output.to_value(),
411 tx: tx.to_string(),
412 }))
413 }
414 Some(DeferredOutbound::Query { response, is_view }) => {
415 if is_view {
416 OutboundMessage::Message(MykoMessage::ViewResponse(response.into_wire()))
417 } else {
418 OutboundMessage::Message(MykoMessage::QueryResponse(response.into_wire()))
419 }
420 }
421 None => {
422 deferred_open = false;
423 continue;
424 }
425 }
426 }
427 maybe = rx.recv(), if normal_open => {
428 match maybe {
429 Some(msg) => msg,
430 None => {
431 normal_open = false;
432 continue;
433 }
434 }
435 }
436 };
437 let (kind, tx_id, seq, _upserts, _deletes, _total_count) = match &msg {
438 OutboundMessage::SerializedCommand { tx, .. } => {
439 ("command", Some(tx.clone()), None, None, None, None)
440 }
441 OutboundMessage::Message(msg) => match msg {
442 MykoMessage::ViewResponse(r) => (
443 "view_response",
444 Some(r.tx.clone()),
445 Some(r.sequence),
446 Some(r.upserts.len()),
447 Some(r.deletes.len()),
448 r.total_count,
449 ),
450 MykoMessage::QueryResponse(r) => (
451 "query_response",
452 Some(r.tx.clone()),
453 Some(r.sequence),
454 Some(r.upserts.len()),
455 Some(r.deletes.len()),
456 r.total_count,
457 ),
458 MykoMessage::CommandResponse(r) => (
459 "command_response",
460 Some(Arc::<str>::from(r.tx.clone())),
461 None,
462 None,
463 None,
464 None,
465 ),
466 MykoMessage::CommandError(r) => (
467 "command_error",
468 Some(Arc::<str>::from(r.tx.clone())),
469 None,
470 None,
471 None,
472 None,
473 ),
474 _ => ("other", None, None, None, None, None),
475 },
476 };
477
478 match &msg {
479 OutboundMessage::SerializedCommand { tx, command_id, .. } => {
480 if !tx.trim().is_empty()
481 && let Ok(mut map) = outbound_commands_by_tx_writer.lock()
482 {
483 map.insert(tx.to_string(), (command_id.clone(), Instant::now()));
484 }
485 }
486 OutboundMessage::Message(MykoMessage::Command(wrapped)) => {
487 if let Some(tx_id) = wrapped.command.get("tx").and_then(|v| v.as_str())
488 && !tx_id.trim().is_empty()
489 && let Ok(mut map) = outbound_commands_by_tx_writer.lock()
490 {
491 map.insert(
492 tx_id.to_string(),
493 (wrapped.command_id.clone(), Instant::now()),
494 );
495 }
496 }
497 _ => {}
498 }
499
500 let ws_msg = match &msg {
501 OutboundMessage::SerializedCommand {
502 payload: EncodedCommandMessage::Json(json),
503 ..
504 } => Message::Text(json.clone().into()),
505 OutboundMessage::SerializedCommand {
506 payload: EncodedCommandMessage::Msgpack(bytes),
507 ..
508 } => Message::Binary(bytes.clone().into()),
509 OutboundMessage::Message(msg) if use_binary_writer.load(Ordering::SeqCst) => {
510 match rmp_serde::to_vec(msg) {
511 Ok(bytes) => Message::Binary(bytes.into()),
512 Err(e) => {
513 log::error!("Failed to serialize message to msgpack: {}", e);
514 continue;
515 }
516 }
517 }
518 OutboundMessage::Message(msg) => match serde_json::to_string(msg) {
519 Ok(json) => Message::Text(json.into()),
520 Err(e) => {
521 log::error!("Failed to serialize message to JSON: {}", e);
522 continue;
523 }
524 },
525 };
526 let payload_bytes = match &ws_msg {
527 Message::Binary(b) => b.len(),
528 Message::Text(t) => t.len(),
529 _ => 0,
530 };
531
532 if let Err(err) = write.send(ws_msg).await {
533 log::error!(
534 "WebSocket write failed for client {} from {} kind={} tx={:?} seq={:?} payload_bytes={} binary={}: {}",
535 write_client_id,
536 write_addr,
537 kind,
538 tx_id,
539 seq,
540 payload_bytes,
541 use_binary_writer.load(Ordering::SeqCst),
542 err
543 );
544 break;
545 }
546 }
547 log::warn!(
548 "WebSocket writer task exiting for client {} from {} normal_open={} priority_open={} deferred_open={}",
549 write_client_id,
550 write_addr,
551 normal_open,
552 priority_open,
553 deferred_open
554 );
555 });
556
557 let command_started_cleanup = command_started_by_tx.clone();
560 let command_task = tokio::spawn(async move {
561 while let Some(job) = command_rx.recv().await {
562 let command_ctx = command_ctx.clone();
563 let command_priority_tx = command_priority_tx.clone();
564 let command_drop_logger = command_drop_logger.clone();
565 let command_client_id = command_client_id.clone();
566 let tx_id = job.tx_id.clone();
567 let started_map = command_started_cleanup.clone();
568 match tokio::task::spawn_blocking(move || {
569 Self::execute_command_job(
570 command_ctx,
571 &command_priority_tx,
572 command_drop_logger.as_ref(),
573 command_client_id,
574 job,
575 );
576 })
577 .await
578 {
579 Ok(()) => {}
580 Err(e) => {
581 log::error!("Command worker panicked: {}", e);
582 }
583 }
584 if let Ok(mut map) = started_map.lock() {
586 map.remove(&tx_id);
587 }
588 }
589 });
590
591 let mut outbound_ttl_interval = interval(Duration::from_secs(10));
595 outbound_ttl_interval.tick().await; loop {
597 tokio::select! {
598 Some(ready) = subscribe_rx.recv() => {
600 let tx_id = match &ready {
601 SubscriptionReady::Query { tx_id, .. } => tx_id.clone(),
602 SubscriptionReady::View { tx_id, .. } => tx_id.clone(),
603 };
604 if let Ok(mut map) = subscribe_started_by_tx.lock() {
605 map.remove(&tx_id);
606 }
607 match ready {
608 SubscriptionReady::Query { tx_id, cellmap, window } => {
609 session.subscribe_query(tx_id, cellmap, window);
610 }
611 SubscriptionReady::View { tx_id, view_id, cellmap, window } => {
612 session.subscribe_view_with_id(tx_id, view_id, cellmap, window);
613 }
614 }
615 }
616 _ = outbound_ttl_interval.tick() => {
620 if let Ok(mut map) = outbound_commands_by_tx.lock() {
621 let before = map.len();
622 map.retain(|_, (_, started)| started.elapsed() < Duration::from_secs(10));
623 let removed = before - map.len();
624 if removed > 0 {
625 log::debug!(
626 "Outbound command TTL sweep client={}: removed {} stale entries, {} remaining",
627 session.client_id,
628 removed,
629 map.len()
630 );
631 }
632 }
633 }
634 msg = read.next() => {
636 let Some(msg) = msg else { break };
637 let ctx = ctx.clone();
638 let msg = match msg {
639 Ok(m) => m,
640 Err(e) => {
641 log::error!("WebSocket read error from {}: {}", client_id, e);
642 break;
643 }
644 };
645
646 match msg {
647 Message::Binary(data) => {
648 if !use_binary.load(Ordering::SeqCst) {
649 log::debug!(
650 "Client {} switched to binary (msgpack) protocol via auto-detect",
651 client_id
652 );
653 use_binary.store(true, Ordering::SeqCst);
654 }
655
656 match rmp_serde::from_slice::<MykoMessage>(&data) {
657 Ok(myko_msg) => {
658 if let Err(e) = Self::handle_message(
659 &mut session,
660 ctx,
661 &priority_tx,
662 &drop_logger,
663 &query_ids_by_tx,
664 &view_ids_by_tx,
665 &subscribe_started_by_tx,
666 &command_started_by_tx,
667 &outbound_commands_by_tx,
668 &command_tx,
669 &subscribe_tx,
670 myko_msg,
671 ) {
672 log::error!("Error handling message: {}", e);
673 }
674 tokio::task::yield_now().await;
675 }
676 Err(e) => {
677 log::warn!("Failed to parse message from {}: {}", client_id, e);
678 }
679 }
680 }
681 Message::Text(text) => {
682 if text == SWITCH_TO_MSGPACK {
683 log::debug!(
684 "Client {} switched to binary (msgpack) protocol via explicit request",
685 client_id
686 );
687 if let Err(e) = priority_tx.try_send(MykoMessage::ProtocolSwitch {
688 protocol: "msgpack".into(),
689 }) {
690 drop_logger.on_drop("ProtocolSwitch", &e);
691 }
692 use_binary.store(true, Ordering::SeqCst);
693 continue;
694 }
695
696 match serde_json::from_str::<MykoMessage>(&text) {
697 Ok(myko_msg) => {
698 if let Err(e) = Self::handle_message(
699 &mut session,
700 ctx,
701 &priority_tx,
702 &drop_logger,
703 &query_ids_by_tx,
704 &view_ids_by_tx,
705 &subscribe_started_by_tx,
706 &command_started_by_tx,
707 &outbound_commands_by_tx,
708 &command_tx,
709 &subscribe_tx,
710 myko_msg,
711 ) {
712 log::error!("Error handling message: {}", e);
713 }
714 tokio::task::yield_now().await;
715 }
716 Err(e) => {
717 log::warn!(
718 "Failed to parse JSON message from {}: {} | raw: {}",
719 client_id,
720 e,
721 if text.len() > 1000 {
722 &text[..1000]
723 } else {
724 &text
725 }
726 );
727 }
728 }
729 }
730 Message::Ping(data) => {
731 log::trace!("Ping from {}", client_id);
732 let _ = data;
733 }
734 Message::Pong(_) => {
735 log::trace!("Pong from {}", client_id);
736 }
737 Message::Close(frame) => {
738 log::warn!("Client {} sent close frame: {:?}", client_id, frame);
739 break;
740 }
741 Message::Frame(_) => {}
742 }
743 }
744 }
745 }
746
747 drop(session); write_task.abort();
750 command_task.abort();
751
752 if let Some(registry) = try_client_registry() {
754 registry.unregister(&client_id);
755 }
756
757 if let Err(e) = ctx.del(&client_entity) {
759 log::error!("Failed to delete client entity: {e}");
760 }
761
762 log::info!("Client disconnected: {}", client_id);
763
764 Ok(())
765 }
766
767 #[allow(clippy::too_many_arguments)]
769 fn handle_message<W: WsWriter>(
770 session: &mut ClientSession<W>,
771 ctx: Arc<CellServerCtx>,
772 priority_tx: &mpsc::Sender<MykoMessage>,
773 drop_logger: &Arc<DropLogger>,
774 query_ids_by_tx: &Arc<Mutex<HashMap<Arc<str>, Arc<str>>>>,
775 view_ids_by_tx: &Arc<Mutex<HashMap<Arc<str>, Arc<str>>>>,
776 subscribe_started_by_tx: &Arc<Mutex<HashMap<Arc<str>, Instant>>>,
777 command_started_by_tx: &Arc<Mutex<HashMap<Arc<str>, Instant>>>,
778 outbound_commands_by_tx: &Arc<Mutex<HashMap<String, (String, Instant)>>>,
779 command_tx: &mpsc::UnboundedSender<CommandJob>,
780 subscribe_tx: &mpsc::UnboundedSender<SubscriptionReady>,
781 msg: MykoMessage,
782 ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
783 let handler_registry = ctx.handler_registry.clone();
784
785 let registry = ctx.registry.clone();
786
787 let host_id = ctx.host_id;
788
789 match msg {
790 MykoMessage::Query(wrapped) => {
791 let tx_id: Arc<str> = wrapped
793 .query
794 .get("tx")
795 .and_then(|v| v.as_str())
796 .unwrap_or("unknown")
797 .into();
798 let query_id = &wrapped.query_id;
799 let entity_type = &wrapped.query_item_type;
800
801 if session.has_subscription(&tx_id) {
804 log::debug!(
805 "Ignoring duplicate query subscribe client={} tx={} query_id={} item_type={}",
806 session.client_id,
807 tx_id,
808 query_id,
809 entity_type
810 );
811 return Ok(());
812 }
813 if let Ok(mut map) = query_ids_by_tx.lock() {
814 map.insert(tx_id.clone(), query_id.clone());
815 }
816 if let Ok(mut map) = subscribe_started_by_tx.lock() {
817 map.entry(tx_id.clone()).or_insert_with(Instant::now);
818 }
819
820 log::trace!("Query {} for {} (tx: {})", query_id, entity_type, tx_id);
821 log::trace!(
822 "Query subscribe request client={} tx={} query_id={} item_type={} window={} active_subscriptions_before={}",
823 session.client_id,
824 tx_id,
825 query_id,
826 entity_type,
827 wrapped.window.is_some(),
828 session.subscription_count()
829 );
830
831 let request_context = Arc::new(RequestContext::from_client(
832 tx_id.clone(),
833 session.client_id.clone(),
834 host_id,
835 ));
836
837 if let Some(query_data) = handler_registry.get_query(query_id) {
838 let parsed = (query_data.parse)(wrapped.query.clone());
839 match parsed {
840 Ok(any_query) => {
841 let cell_factory = query_data.cell_factory;
844 let registry = registry.clone();
845 let request_context = request_context.clone();
846 let ctx = ctx.clone();
847 let window = wrapped.window.clone();
848 let query_id = query_id.clone();
849 let sub_tx = subscribe_tx.clone();
850 tokio::task::spawn_blocking(move || {
851 match cell_factory(any_query, registry, request_context, Some(ctx))
852 {
853 Ok(filtered_cellmap) => {
854 let _ = sub_tx.send(SubscriptionReady::Query {
855 tx_id,
856 cellmap: filtered_cellmap,
857 window,
858 });
859 }
860 Err(e) => {
861 log::error!(
862 "Failed to create query cell for {}: {}",
863 query_id,
864 e
865 );
866 }
867 }
868 });
869 }
870 Err(e) => {
871 log::error!(
872 "Failed to parse query {}: {} | payload: {}",
873 query_id,
874 e,
875 serde_json::to_string(&wrapped.query).unwrap_or_default()
876 );
877 }
878 }
879 } else {
880 log::warn!(
882 "No registered query handler for {}, falling back to select all",
883 query_id
884 );
885 let cellmap = registry.get_or_create(entity_type).select(|_| true);
886 session.subscribe_query(tx_id, cellmap, wrapped.window.clone());
887 }
888 }
889
890 MykoMessage::View(wrapped) => {
891 let tx_id: Arc<str> = wrapped
892 .view
893 .get("tx")
894 .and_then(|v| v.as_str())
895 .unwrap_or("unknown")
896 .into();
897 let view_id = &wrapped.view_id;
898 let item_type = &wrapped.view_item_type;
899
900 if session.has_subscription(&tx_id) {
902 log::debug!(
903 "Ignoring duplicate view subscribe client={} tx={} view_id={} item_type={}",
904 session.client_id,
905 tx_id,
906 view_id,
907 item_type
908 );
909 return Ok(());
910 }
911 if let Ok(mut map) = view_ids_by_tx.lock() {
912 map.insert(tx_id.clone(), view_id.clone());
913 }
914 if let Ok(mut map) = subscribe_started_by_tx.lock() {
915 map.entry(tx_id.clone()).or_insert_with(Instant::now);
916 }
917
918 log::trace!("View {} for {} (tx: {})", view_id, item_type, tx_id);
919 log::trace!(
920 "View subscribe request client={} tx={} view_id={} item_type={} window={:?}",
921 session.client_id,
922 tx_id,
923 view_id,
924 item_type,
925 wrapped.window
926 );
927
928 let request_context = Arc::new(RequestContext::from_client(
929 tx_id.clone(),
930 session.client_id.clone(),
931 host_id,
932 ));
933
934 if let Some(view_data) = handler_registry.get_view(view_id) {
935 let parsed = (view_data.parse)(wrapped.view.clone());
936 match parsed {
937 Ok(any_view) => {
938 log::trace!(
939 "View parsed successfully client={} tx={} view_id={}",
940 session.client_id,
941 tx_id,
942 view_id
943 );
944 let cell_factory = view_data.cell_factory;
947 let registry = registry.clone();
948 let ctx = ctx.clone();
949 let window = wrapped.window.clone();
950 let view_id_clone = view_id.clone();
951 let sub_tx = subscribe_tx.clone();
952 let priority_tx = priority_tx.clone();
953 let drop_logger = drop_logger.clone();
954 tokio::task::spawn_blocking(move || {
955 match cell_factory(any_view, registry, request_context, ctx) {
956 Ok(filtered_cellmap) => {
957 log::trace!(
958 "View cell factory succeeded tx={} view_id={}",
959 tx_id,
960 view_id_clone
961 );
962 let _ = sub_tx.send(SubscriptionReady::View {
963 tx_id,
964 view_id: view_id_clone,
965 cellmap: filtered_cellmap,
966 window,
967 });
968 }
969 Err(e) => {
970 log::error!(
971 "Failed to create view cell for {}: {}",
972 view_id_clone,
973 e
974 );
975 if let Err(err) = priority_tx.try_send(
976 MykoMessage::ViewError(ViewError {
977 tx: tx_id.to_string(),
978 view_id: view_id_clone.to_string(),
979 message: e,
980 }),
981 ) {
982 drop_logger.on_drop("ViewError", &err);
983 }
984 }
985 }
986 });
987 }
988 Err(e) => {
989 let message = format!("Failed to parse view {}: {}", view_id, e);
990 log::error!(
991 "{} | payload: {}",
992 message,
993 serde_json::to_string(&wrapped.view).unwrap_or_default()
994 );
995 if let Err(err) =
996 priority_tx.try_send(MykoMessage::ViewError(ViewError {
997 tx: tx_id.to_string(),
998 view_id: view_id.to_string(),
999 message,
1000 }))
1001 {
1002 drop_logger.on_drop("ViewError", &err);
1003 }
1004 }
1005 }
1006 } else {
1007 let message = format!("No registered handler for view: {}", view_id);
1008 log::warn!("{}", message);
1009 if let Err(err) = priority_tx.try_send(MykoMessage::ViewError(ViewError {
1010 tx: tx_id.to_string(),
1011 view_id: view_id.to_string(),
1012 message,
1013 })) {
1014 drop_logger.on_drop("ViewError", &err);
1015 }
1016 }
1017 }
1018
1019 MykoMessage::QueryCancel(CancelSubscription { tx: tx_id }) => {
1020 log::trace!(
1021 "QueryCancel received: client={} tx={}",
1022 session.client_id,
1023 tx_id
1024 );
1025 let tx_id: Arc<str> = tx_id.into();
1026 if let Ok(mut map) = query_ids_by_tx.lock() {
1027 map.remove(&tx_id);
1028 }
1029 if let Ok(mut map) = subscribe_started_by_tx.lock() {
1030 map.remove(&tx_id);
1031 }
1032 session.cancel(&tx_id);
1033 }
1034
1035 MykoMessage::QueryWindow(QueryWindowUpdate { tx, window }) => {
1036 let tx_id: Arc<str> = tx.into();
1037 log::trace!(
1038 "Query window request client={} tx={} has_window={} active_subscriptions={}",
1039 session.client_id,
1040 tx_id,
1041 window.is_some(),
1042 session.subscription_count()
1043 );
1044 session.update_query_window(&tx_id, window);
1045 }
1046 MykoMessage::ViewCancel(CancelSubscription { tx: tx_id }) => {
1047 log::trace!("View cancel: {}", tx_id);
1048 let tx_id: Arc<str> = tx_id.into();
1049 if let Ok(mut map) = view_ids_by_tx.lock() {
1050 map.remove(&tx_id);
1051 }
1052 if let Ok(mut map) = subscribe_started_by_tx.lock() {
1053 map.remove(&tx_id);
1054 }
1055 session.cancel(&tx_id);
1056 }
1057 MykoMessage::ViewWindow(ViewWindowUpdate { tx, window }) => {
1058 let tx_id: Arc<str> = tx.into();
1059 log::trace!("View window update: {}", tx_id);
1060 session.update_view_window(&tx_id, window);
1061 }
1062
1063 MykoMessage::Report(wrapped) => {
1064 let tx_id: Arc<str> = wrapped
1066 .report
1067 .get("tx")
1068 .and_then(|v| v.as_str())
1069 .unwrap_or("unknown")
1070 .into();
1071 let report_id = &wrapped.report_id;
1072
1073 log::trace!(
1074 "Report subscribe request client={} tx={} report_id={} active_subscriptions_before={}",
1075 session.client_id,
1076 tx_id,
1077 report_id,
1078 session.subscription_count()
1079 );
1080
1081 if let Some(report_data) = handler_registry.get_report(report_id) {
1083 let parsed = (report_data.parse)(wrapped.report.clone());
1085 match parsed {
1086 Ok(any_report) => {
1087 let request_context = Arc::new(RequestContext::from_client(
1088 tx_id.clone(),
1089 session.client_id.clone(),
1090 host_id,
1091 ));
1092
1093 match (report_data.cell_factory)(any_report, request_context, ctx) {
1095 Ok(cell) => {
1096 session.subscribe_report(
1097 tx_id,
1098 report_id.as_str().into(),
1099 cell,
1100 );
1101 }
1102 Err(e) => {
1103 log::error!(
1104 "Failed to create report cell for {}: {}",
1105 report_id,
1106 e
1107 );
1108 }
1109 }
1110 }
1111 Err(e) => {
1112 log::error!(
1113 "Failed to parse report {}: {} | payload: {}",
1114 report_id,
1115 e,
1116 serde_json::to_string(&wrapped.report).unwrap_or_default()
1117 );
1118 }
1119 }
1120 } else {
1121 log::warn!("No registered handler for report: {}", report_id);
1122 }
1123 }
1124
1125 MykoMessage::ReportCancel(CancelSubscription { tx: tx_id }) => {
1126 log::trace!(
1127 "ReportCancel received: client={} tx={} active_subscriptions_before={}",
1128 session.client_id,
1129 tx_id,
1130 session.subscription_count()
1131 );
1132 session.cancel(&tx_id.into());
1133 }
1134
1135 MykoMessage::Event(mut event) => {
1136 record_ws_ingest(std::slice::from_ref(&event));
1137 event.sanitize_null_bytes();
1138 normalize_incoming_event(&mut event, &session.client_id);
1139 if let Err(e) = ctx.apply_event(event) {
1140 log::error!(
1141 "Failed to apply event from client {}: {e}",
1142 session.client_id
1143 );
1144 }
1145 }
1146
1147 MykoMessage::EventBatch(mut events) => {
1148 record_ws_ingest(&events);
1149 let incoming = events.len();
1150 if incoming >= 64 {
1151 log::trace!(
1152 "Received event batch from client {} size={}",
1153 session.client_id,
1154 incoming
1155 );
1156 }
1157 for event in &mut events {
1158 event.sanitize_null_bytes();
1159 normalize_incoming_event(event, &session.client_id);
1160 }
1161 match ctx.apply_event_batch(events) {
1162 Ok(applied) => {
1163 log::trace!(
1164 "Applied event batch from client {} size={}",
1165 session.client_id,
1166 applied
1167 );
1168 }
1169 Err(e) => {
1170 log::error!(
1171 "Failed to apply event batch from client {}: {}",
1172 session.client_id,
1173 e
1174 );
1175 }
1176 }
1177 }
1178
1179 MykoMessage::Command(wrapped) => {
1180 let tx_id: Arc<str> = wrapped
1182 .command
1183 .get("tx")
1184 .and_then(|v| v.as_str())
1185 .unwrap_or("unknown")
1186 .into();
1187
1188 let command_id = &wrapped.command_id;
1189
1190 log::trace!("Command {} (tx: {})", command_id, tx_id,);
1191 let received_at = Instant::now();
1192 if let Ok(mut map) = command_started_by_tx.lock() {
1193 map.insert(tx_id.clone(), received_at);
1194 }
1195 if let Err(e) = command_tx.send(CommandJob {
1196 tx_id: tx_id.clone(),
1197 command_id: wrapped.command_id.clone(),
1198 command: wrapped.command.clone(),
1199 received_at,
1200 }) {
1201 log::error!(
1202 "Failed to enqueue command {} for client {} tx={}: {}",
1203 command_id,
1204 session.client_id,
1205 tx_id,
1206 e
1207 );
1208 let error = MykoMessage::CommandError(CommandError {
1209 tx: tx_id.to_string(),
1210 command_id: command_id.to_string(),
1211 message: "Command queue unavailable".to_string(),
1212 });
1213 if let Err(err) = priority_tx.try_send(error) {
1214 drop_logger.on_drop("CommandError", &err);
1215 }
1216 }
1217 }
1218
1219 MykoMessage::Ping(PingData { id, timestamp }) => {
1220 let pong = MykoMessage::Ping(PingData { id, timestamp });
1222 if let Err(e) = priority_tx.try_send(pong) {
1223 drop_logger.on_drop("Ping", &e);
1224 }
1225 }
1226
1227 MykoMessage::QueryResponse(resp) => {
1229 log::warn!(
1230 "Unexpected client message kind=query_response client={} tx={} seq={} upserts={} deletes={} active_subscriptions={}",
1231 session.client_id,
1232 resp.tx,
1233 resp.sequence,
1234 resp.upserts.len(),
1235 resp.deletes.len(),
1236 session.subscription_count()
1237 );
1238 }
1239 MykoMessage::QueryError(err) => {
1240 log::warn!(
1241 "Unexpected client message kind=query_error client={} tx={} query_id={} message={} active_subscriptions={}",
1242 session.client_id,
1243 err.tx,
1244 err.query_id,
1245 err.message,
1246 session.subscription_count()
1247 );
1248 }
1249 MykoMessage::ViewResponse(resp) => {
1250 log::warn!(
1251 "Unexpected client message kind=view_response client={} tx={} seq={} upserts={} deletes={} active_subscriptions={}",
1252 session.client_id,
1253 resp.tx,
1254 resp.sequence,
1255 resp.upserts.len(),
1256 resp.deletes.len(),
1257 session.subscription_count()
1258 );
1259 }
1260 MykoMessage::ViewError(err) => {
1261 log::warn!(
1262 "Unexpected client message kind=view_error client={} tx={} view_id={} message={} active_subscriptions={}",
1263 session.client_id,
1264 err.tx,
1265 err.view_id,
1266 err.message,
1267 session.subscription_count()
1268 );
1269 }
1270 MykoMessage::ReportResponse(resp) => {
1271 log::warn!(
1272 "Unexpected client message kind=report_response client={} tx={} active_subscriptions={}",
1273 session.client_id,
1274 resp.tx,
1275 session.subscription_count()
1276 );
1277 }
1278 MykoMessage::ReportError(err) => {
1279 log::warn!(
1280 "Unexpected client message kind=report_error client={} tx={} report_id={} message={} active_subscriptions={}",
1281 session.client_id,
1282 err.tx,
1283 err.report_id,
1284 err.message,
1285 session.subscription_count()
1286 );
1287 }
1288 MykoMessage::CommandResponse(resp) => {
1289 if resp.tx.trim().is_empty() {
1290 log::warn!(
1291 "Malformed client message kind=command_response client={} tx=<empty> active_subscriptions={}",
1292 session.client_id,
1293 session.subscription_count()
1294 );
1295 } else {
1296 let correlated = outbound_commands_by_tx
1297 .lock()
1298 .ok()
1299 .and_then(|mut map| map.remove(&resp.tx));
1300 if let Some((command_id, started)) = correlated {
1301 log::debug!(
1302 "Client command response matched outbound command client={} tx={} command_id={} roundtrip_ms={} active_subscriptions={}",
1303 session.client_id,
1304 resp.tx,
1305 command_id,
1306 started.elapsed().as_millis(),
1307 session.subscription_count()
1308 );
1309 } else {
1310 log::warn!(
1311 "Client command response without outbound match client={} tx={} active_subscriptions={}",
1312 session.client_id,
1313 resp.tx,
1314 session.subscription_count()
1315 );
1316 }
1317 }
1318 }
1319 MykoMessage::CommandError(err) => {
1320 if err.tx.trim().is_empty() {
1321 log::warn!(
1322 "Malformed client message kind=command_error client={} tx=<empty> command_id={} message={} active_subscriptions={}",
1323 session.client_id,
1324 err.command_id,
1325 err.message,
1326 session.subscription_count()
1327 );
1328 } else {
1329 let correlated = outbound_commands_by_tx
1330 .lock()
1331 .ok()
1332 .and_then(|mut map| map.remove(&err.tx));
1333 if let Some((command_id, started)) = correlated {
1334 log::warn!(
1335 "Client command error matched outbound command client={} tx={} command_id={} transport_command_id={} message={} roundtrip_ms={} active_subscriptions={}",
1336 session.client_id,
1337 err.tx,
1338 err.command_id,
1339 command_id,
1340 err.message,
1341 started.elapsed().as_millis(),
1342 session.subscription_count()
1343 );
1344 } else {
1345 log::warn!(
1346 "Client command error without outbound match client={} tx={} command_id={} message={} active_subscriptions={}",
1347 session.client_id,
1348 err.tx,
1349 err.command_id,
1350 err.message,
1351 session.subscription_count()
1352 );
1353 }
1354 }
1355 }
1356 MykoMessage::Benchmark(payload) => {
1357 let stats = ws_benchmark_stats();
1358 ensure_ws_benchmark_logger();
1359 stats.message_count.fetch_add(1, Ordering::Relaxed);
1360 let size = payload.to_string().len() as u64;
1362 stats.total_bytes.fetch_add(size, Ordering::Relaxed);
1363 }
1364 MykoMessage::ProtocolSwitch { protocol } => {
1365 log::warn!(
1366 "Unexpected client message kind=protocol_switch_ack client={} protocol={} active_subscriptions={}",
1367 session.client_id,
1368 protocol,
1369 session.subscription_count()
1370 );
1371 }
1372 }
1373
1374 Ok(())
1375 }
1376
1377 fn execute_command_job(
1378 ctx: Arc<CellServerCtx>,
1379 priority_tx: &mpsc::Sender<MykoMessage>,
1380 drop_logger: &DropLogger,
1381 client_id: Arc<str>,
1382 job: CommandJob,
1383 ) {
1384 let host_id = ctx.host_id;
1385 let started = Instant::now();
1386 let queue_wait_ms = started.duration_since(job.received_at).as_millis();
1387 let command_id = job.command_id.clone();
1388
1389 let mut handler_found = false;
1390 for registration in inventory::iter::<CommandHandlerRegistration> {
1391 if registration.command_id == command_id {
1392 handler_found = true;
1393 let executor = (registration.factory)();
1394
1395 let req = Arc::new(RequestContext::from_client(
1396 job.tx_id.clone(),
1397 client_id.clone(),
1398 host_id,
1399 ));
1400 let cmd_id: Arc<str> = Arc::from(command_id.clone());
1401 let cmd_ctx = CommandContext::new(cmd_id, req, ctx.clone());
1402 let execute_started = Instant::now();
1403
1404 match executor.execute_from_value(job.command.clone(), cmd_ctx) {
1405 Ok(result) => {
1406 let response = MykoMessage::CommandResponse(CommandResponse {
1407 response: result,
1408 tx: job.tx_id.to_string(),
1409 });
1410 if let Err(e) = priority_tx.try_send(response) {
1411 drop_logger.on_drop("CommandResponse", &e);
1412 }
1413 }
1414 Err(e) => {
1415 let error = MykoMessage::CommandError(CommandError {
1416 tx: job.tx_id.to_string(),
1417 command_id: command_id.clone(),
1418 message: e.message,
1419 });
1420 if let Err(err) = priority_tx.try_send(error) {
1421 drop_logger.on_drop("CommandError", &err);
1422 }
1423 }
1424 }
1425 let execute_ms = execute_started.elapsed().as_millis();
1426 let total_ms = job.received_at.elapsed().as_millis();
1427 log::trace!(
1428 target: "myko_server::ws_perf",
1429 "command_exec client={} tx={} command_id={} queue_wait_ms={} execute_ms={} total_ms={}",
1430 client_id,
1431 job.tx_id,
1432 command_id,
1433 queue_wait_ms,
1434 execute_ms,
1435 total_ms
1436 );
1437 break;
1438 }
1439 }
1440
1441 if !handler_found {
1442 log::warn!("No registered handler for command: {}", command_id);
1443 let error = MykoMessage::CommandError(CommandError {
1444 tx: job.tx_id.to_string(),
1445 command_id: command_id.clone(),
1446 message: format!("Command handler not found: {}", command_id),
1447 });
1448 if let Err(e) = priority_tx.try_send(error) {
1449 drop_logger.on_drop("CommandError", &e);
1450 }
1451 }
1452
1453 if !handler_found {
1454 log::debug!(
1455 target: "myko_server::ws_perf",
1456 "command_exec client={} tx={} command_id={} queue_wait_ms={} execute_ms=0 total_ms={} handler_found=false",
1457 client_id,
1458 job.tx_id,
1459 command_id,
1460 queue_wait_ms,
1461 job.received_at.elapsed().as_millis()
1462 );
1463 }
1464 }
1465}
1466
1467struct ChannelWriter {
1472 tx: mpsc::Sender<OutboundMessage>,
1473 deferred_tx: mpsc::Sender<DeferredOutbound>,
1474 drop_logger: Arc<DropLogger>,
1475 use_binary_writer: Arc<AtomicBool>,
1476}
1477
1478impl WsWriter for ChannelWriter {
1479 fn send(&self, msg: MykoMessage) {
1480 if let Err(e) = self.tx.try_send(OutboundMessage::Message(msg)) {
1482 self.drop_logger.on_drop("message", &e);
1483 }
1484 }
1485
1486 fn protocol(&self) -> myko::client::MykoProtocol {
1487 if self.use_binary_writer.load(Ordering::SeqCst) {
1488 myko::client::MykoProtocol::MSGPACK
1489 } else {
1490 myko::client::MykoProtocol::JSON
1491 }
1492 }
1493
1494 fn send_serialized_command(
1495 &self,
1496 tx: Arc<str>,
1497 command_id: String,
1498 payload: EncodedCommandMessage,
1499 ) {
1500 if let Err(e) = self.tx.try_send(OutboundMessage::SerializedCommand {
1501 tx,
1502 command_id,
1503 payload,
1504 }) {
1505 self.drop_logger.on_drop("serialized_command", &e);
1506 }
1507 }
1508
1509 fn send_report_response(&self, tx: Arc<str>, output: Arc<dyn AnyOutput>) {
1510 if let Err(e) = self
1511 .deferred_tx
1512 .try_send(DeferredOutbound::Report(tx, output))
1513 {
1514 self.drop_logger.on_drop("ReportResponseDeferred", &e);
1515 }
1516 }
1517
1518 fn send_query_response(&self, response: PendingQueryResponse, is_view: bool) {
1519 if let Err(e) = self
1520 .deferred_tx
1521 .try_send(DeferredOutbound::Query { response, is_view })
1522 {
1523 self.drop_logger.on_drop("QueryResponseDeferred", &e);
1524 }
1525 }
1526}
1527
1528#[cfg(test)]
1529mod tests {
1530 use super::*;
1531
1532 #[test]
1533 fn test_channel_writer() {
1534 let (tx, mut rx) = mpsc::channel(10);
1535 let (deferred_tx, _deferred_rx) = mpsc::channel(10);
1536 let drop_logger = Arc::new(DropLogger::new("test-client".into()));
1537 let writer = ChannelWriter {
1538 tx,
1539 deferred_tx,
1540 drop_logger,
1541 use_binary_writer: Arc::new(AtomicBool::new(false)),
1542 };
1543
1544 let msg = MykoMessage::Ping(PingData {
1545 id: "test".to_string(),
1546 timestamp: 0,
1547 });
1548 writer.send(msg);
1549
1550 let received = rx.try_recv().unwrap();
1551 assert!(matches!(
1552 received,
1553 OutboundMessage::Message(MykoMessage::Ping(_))
1554 ));
1555 }
1556}