1use std::{
6 collections::HashMap,
7 net::SocketAddr,
8 sync::{
9 Arc, Mutex, OnceLock,
10 atomic::{AtomicBool, AtomicU64, Ordering},
11 },
12 thread,
13 time::{Duration, Instant},
14};
15
16use dashmap::DashMap;
17use futures_util::{SinkExt, StreamExt};
18use hyphae::SelectExt;
19use myko::{
20 WS_MAX_FRAME_SIZE_BYTES, WS_MAX_MESSAGE_SIZE_BYTES,
21 command::{CommandContext, CommandHandlerRegistration},
22 entities::client::{Client, ClientId},
23 relationship::{iter_client_id_registrations, iter_fallback_to_id_registrations},
24 report::AnyOutput,
25 request::RequestContext,
26 server::{
27 CellServerCtx, ClientSession, PendingQueryResponse, WsWriter,
28 client_registry::try_client_registry,
29 },
30 wire::{
31 CancelSubscription, CommandError, CommandResponse, EncodedCommandMessage, MEvent,
32 MEventType, MykoMessage, PingData, QueryWindowUpdate, ViewError, ViewWindowUpdate,
33 },
34};
35use tokio::{net::TcpStream, sync::mpsc, time::interval};
36use tokio_tungstenite::{
37 accept_async_with_config,
38 tungstenite::{Message, protocol::WebSocketConfig},
39};
40use uuid::Uuid;
41
42const SWITCH_TO_MSGPACK: &str = "myko:switch-to-msgpack";
45
46struct WsIngestStats {
47 counts_by_type: DashMap<Arc<str>, Arc<AtomicU64>>,
48}
49
50struct WsBenchmarkStats {
51 message_count: AtomicU64,
52 total_bytes: AtomicU64,
53}
54
55static WS_INGEST_STATS: OnceLock<Arc<WsIngestStats>> = OnceLock::new();
56static WS_INGEST_LOGGER_STARTED: AtomicBool = AtomicBool::new(false);
57static WS_BENCHMARK_STATS: OnceLock<Arc<WsBenchmarkStats>> = OnceLock::new();
58static WS_BENCHMARK_LOGGER_STARTED: AtomicBool = AtomicBool::new(false);
59
60fn ws_benchmark_stats() -> Arc<WsBenchmarkStats> {
61 WS_BENCHMARK_STATS
62 .get_or_init(|| {
63 Arc::new(WsBenchmarkStats {
64 message_count: AtomicU64::new(0),
65 total_bytes: AtomicU64::new(0),
66 })
67 })
68 .clone()
69}
70
71fn ensure_ws_benchmark_logger() {
72 if WS_BENCHMARK_LOGGER_STARTED
73 .compare_exchange(false, true, Ordering::Relaxed, Ordering::Relaxed)
74 .is_err()
75 {
76 return;
77 }
78
79 let stats = ws_benchmark_stats();
80 thread::Builder::new()
81 .name("ws-benchmark-logger".into())
82 .spawn(move || {
83 loop {
84 thread::sleep(Duration::from_secs(1));
85
86 let count = stats.message_count.swap(0, Ordering::Relaxed);
87 let bytes = stats.total_bytes.swap(0, Ordering::Relaxed);
88
89 if count == 0 {
90 continue;
91 }
92
93 log::info!(
94 "WebSocket benchmark last_1s messages={} bytes={} avg_bytes={}",
95 count,
96 bytes,
97 if count > 0 { bytes / count } else { 0 },
98 );
99 }
100 })
101 .expect("failed to spawn websocket benchmark logger thread");
102}
103
104fn ws_ingest_stats() -> Arc<WsIngestStats> {
105 WS_INGEST_STATS
106 .get_or_init(|| {
107 Arc::new(WsIngestStats {
108 counts_by_type: DashMap::new(),
109 })
110 })
111 .clone()
112}
113
114fn ensure_ws_ingest_logger() {
115 if WS_INGEST_LOGGER_STARTED
116 .compare_exchange(false, true, Ordering::Relaxed, Ordering::Relaxed)
117 .is_err()
118 {
119 return;
120 }
121
122 let stats = ws_ingest_stats();
123 thread::Builder::new()
124 .name("ws-ingest-logger".into())
125 .spawn(move || {
126 loop {
127 thread::sleep(Duration::from_secs(1));
128
129 let mut per_type = Vec::new();
130 let mut total = 0u64;
131 for entry in &stats.counts_by_type {
132 let count = entry.value().swap(0, Ordering::Relaxed);
133 if count == 0 {
134 continue;
135 }
136 total += count;
137 per_type.push((entry.key().clone(), count));
138 }
139
140 if total == 0 {
141 continue;
142 }
143
144 per_type.sort_by(|a, b| b.1.cmp(&a.1).then_with(|| a.0.cmp(&b.0)));
145 let details = per_type
146 .into_iter()
147 .map(|(entity_type, count)| format!("{entity_type}={count}"))
148 .collect::<Vec<_>>()
149 .join(" ");
150
151 log::info!("WebSocket ingest last_1s total={} {}", total, details);
152 }
153 })
154 .expect("failed to spawn websocket ingest logger thread");
155}
156
157fn record_ws_ingest(events: &[MEvent]) {
158 if events.is_empty() {
159 return;
160 }
161
162 let stats = ws_ingest_stats();
163 for event in events {
164 let counter = stats
165 .counts_by_type
166 .entry(Arc::<str>::from(event.item_type.as_str()))
167 .or_insert_with(|| Arc::new(AtomicU64::new(0)))
168 .clone();
169 counter.fetch_add(1, Ordering::Relaxed);
170 }
171}
172
173fn normalize_incoming_event(event: &mut MEvent, client_id: &str) {
174 if event.change_type != MEventType::SET {
175 return;
176 }
177
178 for reg in iter_client_id_registrations() {
180 if reg.entity_type == event.item_type {
181 if let Some(obj) = event.item.as_object_mut() {
182 obj.insert(
183 reg.field_name_json.to_string(),
184 serde_json::Value::String(client_id.to_string()),
185 );
186 }
187 break;
188 }
189 }
190
191 if let Some(obj) = event.item.as_object_mut()
194 && let Some(id) = obj.get("id").and_then(|v| v.as_str()).map(String::from)
195 {
196 for reg in iter_fallback_to_id_registrations() {
197 if reg.entity_type == event.item_type {
198 let field = reg.field_name_json;
199 if matches!(obj.get(field), None | Some(serde_json::Value::Null)) {
200 obj.insert(field.to_string(), serde_json::Value::String(id.clone()));
201 }
202 }
203 }
204 }
205}
206
207struct DropLogger {
212 client_id: Arc<str>,
213 dropped: std::sync::atomic::AtomicU64,
214 last_log_ms: std::sync::atomic::AtomicU64,
215}
216
217impl DropLogger {
218 fn new(client_id: Arc<str>) -> Self {
219 Self {
220 client_id,
221 dropped: std::sync::atomic::AtomicU64::new(0),
222 last_log_ms: std::sync::atomic::AtomicU64::new(0),
223 }
224 }
225
226 fn on_drop(&self, kind: &'static str, err: &dyn std::fmt::Display) {
227 use std::sync::atomic::Ordering;
228
229 self.dropped.fetch_add(1, Ordering::Relaxed);
230
231 let now_ms = std::time::SystemTime::now()
233 .duration_since(std::time::UNIX_EPOCH)
234 .unwrap_or_default()
235 .as_millis() as u64;
236 let last_ms = self.last_log_ms.load(Ordering::Relaxed);
237 if now_ms.saturating_sub(last_ms) < 1000 {
238 return;
239 }
240
241 if self
242 .last_log_ms
243 .compare_exchange(last_ms, now_ms, Ordering::Relaxed, Ordering::Relaxed)
244 .is_err()
245 {
246 return;
247 }
248
249 let n = self.dropped.swap(0, Ordering::Relaxed);
250 log::warn!(
251 "WebSocket send buffer full; dropped {} message(s) for client {} (latest: {}): {}",
252 n,
253 self.client_id,
254 kind,
255 err
256 );
257 }
258}
259
260struct CommandJob {
261 tx_id: Arc<str>,
262 command_id: String,
263 command: serde_json::Value,
264 received_at: Instant,
265}
266
267enum SubscriptionReady {
269 Query {
270 tx_id: Arc<str>,
271 cellmap: hyphae::CellMap<Arc<str>, Arc<dyn myko::item::AnyItem>, hyphae::CellImmutable>,
272 window: Option<myko::wire::QueryWindow>,
273 },
274 View {
275 tx_id: Arc<str>,
276 view_id: Arc<str>,
277 cellmap: hyphae::CellMap<Arc<str>, Arc<dyn myko::item::AnyItem>, hyphae::CellImmutable>,
278 window: Option<myko::wire::QueryWindow>,
279 },
280}
281
282enum OutboundMessage {
283 Message(MykoMessage),
284 SerializedCommand {
285 tx: Arc<str>,
286 command_id: String,
287 payload: EncodedCommandMessage,
288 },
289}
290
291enum DeferredOutbound {
292 Report(Arc<str>, Arc<dyn AnyOutput>),
293 Query {
294 response: PendingQueryResponse,
295 is_view: bool,
296 },
297}
298
299pub struct WsHandler;
301
302impl WsHandler {
303 #[allow(clippy::too_many_arguments)]
305 pub async fn handle_connection(
306 stream: TcpStream,
307 addr: SocketAddr,
308 ctx: Arc<CellServerCtx>,
309 ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
310 ensure_ws_ingest_logger();
311
312 let host_id = ctx.host_id;
313
314 let mut ws_config = WebSocketConfig::default();
315 ws_config.max_message_size = Some(WS_MAX_MESSAGE_SIZE_BYTES);
316 ws_config.max_frame_size = Some(WS_MAX_FRAME_SIZE_BYTES);
317 let ws_stream = accept_async_with_config(stream, Some(ws_config)).await?;
318 let (mut write, mut read) = ws_stream.split();
319
320 let (tx, mut rx) = mpsc::channel::<OutboundMessage>(10_000);
323 let (deferred_tx, mut deferred_rx) = mpsc::channel::<DeferredOutbound>(10_000);
324 let (priority_tx, mut priority_rx) = mpsc::channel::<MykoMessage>(1_000);
325 let (command_tx, mut command_rx) = mpsc::unbounded_channel::<CommandJob>();
326 let (subscribe_tx, mut subscribe_rx) = mpsc::unbounded_channel::<SubscriptionReady>();
327
328 let use_binary = Arc::new(AtomicBool::new(false));
330
331 let client_id: Arc<str> = Uuid::new_v4().to_string().into();
333 let drop_logger = Arc::new(DropLogger::new(client_id.clone()));
334 let writer = ChannelWriter {
335 tx: tx.clone(),
336 deferred_tx: deferred_tx.clone(),
337 drop_logger: drop_logger.clone(),
338 use_binary_writer: use_binary.clone(),
339 };
340
341 let writer_arc: Arc<dyn WsWriter> = Arc::new(ChannelWriter {
343 tx: tx.clone(),
344 deferred_tx: deferred_tx.clone(),
345 drop_logger: drop_logger.clone(),
346 use_binary_writer: use_binary.clone(),
347 });
348 if let Some(registry) = try_client_registry() {
349 registry.register(client_id.clone(), writer_arc);
350 }
351
352 let mut session = ClientSession::new(client_id.clone(), writer);
353
354 let use_binary_writer = use_binary.clone();
355 let query_ids_by_tx: Arc<Mutex<HashMap<Arc<str>, Arc<str>>>> =
356 Arc::new(Mutex::new(HashMap::new()));
357 let view_ids_by_tx: Arc<Mutex<HashMap<Arc<str>, Arc<str>>>> =
358 Arc::new(Mutex::new(HashMap::new()));
359 let subscribe_started_by_tx: Arc<Mutex<HashMap<Arc<str>, Instant>>> =
360 Arc::new(Mutex::new(HashMap::new()));
361 let command_started_by_tx: Arc<Mutex<HashMap<Arc<str>, Instant>>> =
362 Arc::new(Mutex::new(HashMap::new()));
363 let outbound_commands_by_tx: Arc<Mutex<HashMap<String, (String, Instant)>>> =
364 Arc::new(Mutex::new(HashMap::new()));
365
366 let outbound_commands_by_tx_writer = outbound_commands_by_tx.clone();
367
368 let client_entity = Client {
370 id: ClientId(client_id.clone()),
371 server_id: host_id.to_string().into(),
372 windback: None,
373 };
374 if let Err(e) = ctx.set(&client_entity) {
375 log::error!("Failed to persist client entity: {e}");
376 }
377
378 log::info!("Client connected: {} from {}", client_id, addr);
379
380 let write_ctx = ctx.clone();
381 let write_client_id = client_id.clone();
382 let write_addr = addr;
383 let command_ctx = ctx.clone();
384 let command_priority_tx = priority_tx.clone();
385 let command_drop_logger = drop_logger.clone();
386 let command_client_id = client_id.clone();
387
388 let write_task = tokio::spawn(async move {
390 let _ctx = write_ctx;
391 let mut normal_open = true;
392 let mut priority_open = true;
393 let mut deferred_open = true;
394 while normal_open || priority_open || deferred_open {
395 let msg = tokio::select! {
396 biased;
397 maybe = priority_rx.recv(), if priority_open => {
398 match maybe {
399 Some(msg) => OutboundMessage::Message(msg),
400 None => {
401 priority_open = false;
402 continue;
403 }
404 }
405 }
406 maybe = deferred_rx.recv(), if deferred_open => {
407 match maybe {
408 Some(DeferredOutbound::Report(tx, output)) => {
409 OutboundMessage::Message(MykoMessage::ReportResponse(myko::wire::ReportResponse {
410 response: output.to_value(),
411 tx: tx.to_string(),
412 }))
413 }
414 Some(DeferredOutbound::Query { response, is_view }) => {
415 if is_view {
416 OutboundMessage::Message(MykoMessage::ViewResponse(response.into_wire()))
417 } else {
418 OutboundMessage::Message(MykoMessage::QueryResponse(response.into_wire()))
419 }
420 }
421 None => {
422 deferred_open = false;
423 continue;
424 }
425 }
426 }
427 maybe = rx.recv(), if normal_open => {
428 match maybe {
429 Some(msg) => msg,
430 None => {
431 normal_open = false;
432 continue;
433 }
434 }
435 }
436 };
437 let (kind, tx_id, seq, _upserts, _deletes, _total_count) = match &msg {
438 OutboundMessage::SerializedCommand { tx, .. } => {
439 ("command", Some(tx.clone()), None, None, None, None)
440 }
441 OutboundMessage::Message(msg) => match msg {
442 MykoMessage::ViewResponse(r) => (
443 "view_response",
444 Some(r.tx.clone()),
445 Some(r.sequence),
446 Some(r.upserts.len()),
447 Some(r.deletes.len()),
448 r.total_count,
449 ),
450 MykoMessage::QueryResponse(r) => (
451 "query_response",
452 Some(r.tx.clone()),
453 Some(r.sequence),
454 Some(r.upserts.len()),
455 Some(r.deletes.len()),
456 r.total_count,
457 ),
458 MykoMessage::CommandResponse(r) => (
459 "command_response",
460 Some(Arc::<str>::from(r.tx.clone())),
461 None,
462 None,
463 None,
464 None,
465 ),
466 MykoMessage::CommandError(r) => (
467 "command_error",
468 Some(Arc::<str>::from(r.tx.clone())),
469 None,
470 None,
471 None,
472 None,
473 ),
474 _ => ("other", None, None, None, None, None),
475 },
476 };
477
478 match &msg {
479 OutboundMessage::SerializedCommand { tx, command_id, .. } => {
480 if !tx.trim().is_empty()
481 && let Ok(mut map) = outbound_commands_by_tx_writer.lock()
482 {
483 map.insert(tx.to_string(), (command_id.clone(), Instant::now()));
484 }
485 }
486 OutboundMessage::Message(MykoMessage::Command(wrapped)) => {
487 if let Some(tx_id) = wrapped.command.get("tx").and_then(|v| v.as_str())
488 && !tx_id.trim().is_empty()
489 && let Ok(mut map) = outbound_commands_by_tx_writer.lock()
490 {
491 map.insert(
492 tx_id.to_string(),
493 (wrapped.command_id.clone(), Instant::now()),
494 );
495 }
496 }
497 _ => {}
498 }
499
500 let ws_msg = match &msg {
501 OutboundMessage::SerializedCommand {
502 payload: EncodedCommandMessage::Json(json),
503 ..
504 } => Message::Text(json.clone().into()),
505 OutboundMessage::SerializedCommand {
506 payload: EncodedCommandMessage::Msgpack(bytes),
507 ..
508 } => Message::Binary(bytes.clone().into()),
509 OutboundMessage::Message(msg) if use_binary_writer.load(Ordering::SeqCst) => {
510 match rmp_serde::to_vec(msg) {
511 Ok(bytes) => Message::Binary(bytes.into()),
512 Err(e) => {
513 log::error!("Failed to serialize message to msgpack: {}", e);
514 continue;
515 }
516 }
517 }
518 OutboundMessage::Message(msg) => match serde_json::to_string(msg) {
519 Ok(json) => Message::Text(json.into()),
520 Err(e) => {
521 log::error!("Failed to serialize message to JSON: {}", e);
522 continue;
523 }
524 },
525 };
526 let payload_bytes = match &ws_msg {
527 Message::Binary(b) => b.len(),
528 Message::Text(t) => t.len(),
529 _ => 0,
530 };
531
532 if let Err(err) = write.send(ws_msg).await {
533 log::error!(
534 "WebSocket write failed for client {} from {} kind={} tx={:?} seq={:?} payload_bytes={} binary={}: {}",
535 write_client_id,
536 write_addr,
537 kind,
538 tx_id,
539 seq,
540 payload_bytes,
541 use_binary_writer.load(Ordering::SeqCst),
542 err
543 );
544 break;
545 }
546 }
547 log::warn!(
548 "WebSocket writer task exiting for client {} from {} normal_open={} priority_open={} deferred_open={}",
549 write_client_id,
550 write_addr,
551 normal_open,
552 priority_open,
553 deferred_open
554 );
555 });
556
557 let command_started_cleanup = command_started_by_tx.clone();
560 let command_task = tokio::spawn(async move {
561 while let Some(job) = command_rx.recv().await {
562 let command_ctx = command_ctx.clone();
563 let command_priority_tx = command_priority_tx.clone();
564 let command_drop_logger = command_drop_logger.clone();
565 let command_client_id = command_client_id.clone();
566 let tx_id = job.tx_id.clone();
567 let started_map = command_started_cleanup.clone();
568 match tokio::task::spawn_blocking(move || {
569 Self::execute_command_job(
570 command_ctx,
571 &command_priority_tx,
572 command_drop_logger.as_ref(),
573 command_client_id,
574 job,
575 );
576 })
577 .await
578 {
579 Ok(()) => {}
580 Err(e) => {
581 log::error!("Command worker panicked: {}", e);
582 }
583 }
584 if let Ok(mut map) = started_map.lock() {
586 map.remove(&tx_id);
587 }
588 }
589 });
590
591 let mut outbound_ttl_interval = interval(Duration::from_secs(10));
595 outbound_ttl_interval.tick().await; loop {
597 tokio::select! {
598 Some(ready) = subscribe_rx.recv() => {
600 let tx_id = match &ready {
601 SubscriptionReady::Query { tx_id, .. } => tx_id.clone(),
602 SubscriptionReady::View { tx_id, .. } => tx_id.clone(),
603 };
604 if let Ok(mut map) = subscribe_started_by_tx.lock() {
605 map.remove(&tx_id);
606 }
607 match ready {
608 SubscriptionReady::Query { tx_id, cellmap, window } => {
609 session.subscribe_query(tx_id, cellmap, window);
610 }
611 SubscriptionReady::View { tx_id, view_id, cellmap, window } => {
612 session.subscribe_view_with_id(tx_id, view_id, cellmap, window);
613 }
614 }
615 }
616 _ = outbound_ttl_interval.tick() => {
620 if let Ok(mut map) = outbound_commands_by_tx.lock() {
621 let before = map.len();
622 map.retain(|_, (_, started)| started.elapsed() < Duration::from_secs(10));
623 let removed = before - map.len();
624 if removed > 0 {
625 log::debug!(
626 "Outbound command TTL sweep client={}: removed {} stale entries, {} remaining",
627 session.client_id,
628 removed,
629 map.len()
630 );
631 }
632 }
633 }
634 msg = read.next() => {
636 let Some(msg) = msg else { break };
637 let ctx = ctx.clone();
638 let msg = match msg {
639 Ok(m) => m,
640 Err(e) => {
641 log::error!("WebSocket read error from {}: {}", client_id, e);
642 break;
643 }
644 };
645
646 match msg {
647 Message::Binary(data) => {
648 if !use_binary.load(Ordering::SeqCst) {
649 log::debug!(
650 "Client {} switched to binary (msgpack) protocol via auto-detect",
651 client_id
652 );
653 use_binary.store(true, Ordering::SeqCst);
654 }
655
656 match rmp_serde::from_slice::<MykoMessage>(&data) {
657 Ok(myko_msg) => {
658 if let Err(e) = Self::handle_message(
659 &mut session,
660 ctx,
661 &priority_tx,
662 &drop_logger,
663 &query_ids_by_tx,
664 &view_ids_by_tx,
665 &subscribe_started_by_tx,
666 &command_started_by_tx,
667 &outbound_commands_by_tx,
668 &command_tx,
669 &subscribe_tx,
670 myko_msg,
671 ) {
672 log::error!("Error handling message: {}", e);
673 }
674 tokio::task::yield_now().await;
675 }
676 Err(e) => {
677 log::warn!("Failed to parse message from {}: {}", client_id, e);
678 }
679 }
680 }
681 Message::Text(text) => {
682 if text == SWITCH_TO_MSGPACK {
683 log::debug!(
684 "Client {} switched to binary (msgpack) protocol via explicit request",
685 client_id
686 );
687 if let Err(e) = priority_tx.try_send(MykoMessage::ProtocolSwitch {
688 protocol: "msgpack".into(),
689 }) {
690 drop_logger.on_drop("ProtocolSwitch", &e);
691 }
692 use_binary.store(true, Ordering::SeqCst);
693 continue;
694 }
695
696 match serde_json::from_str::<MykoMessage>(&text) {
697 Ok(myko_msg) => {
698 if let Err(e) = Self::handle_message(
699 &mut session,
700 ctx,
701 &priority_tx,
702 &drop_logger,
703 &query_ids_by_tx,
704 &view_ids_by_tx,
705 &subscribe_started_by_tx,
706 &command_started_by_tx,
707 &outbound_commands_by_tx,
708 &command_tx,
709 &subscribe_tx,
710 myko_msg,
711 ) {
712 log::error!("Error handling message: {}", e);
713 }
714 tokio::task::yield_now().await;
715 }
716 Err(e) => {
717 log::warn!(
718 "Failed to parse JSON message from {}: {} | raw: {}",
719 client_id,
720 e,
721 if text.len() > 1000 {
722 &text[..1000]
723 } else {
724 &text
725 }
726 );
727 }
728 }
729 }
730 Message::Ping(data) => {
731 log::trace!("Ping from {}", client_id);
732 let _ = data;
733 }
734 Message::Pong(_) => {
735 log::trace!("Pong from {}", client_id);
736 }
737 Message::Close(frame) => {
738 log::warn!("Client {} sent close frame: {:?}", client_id, frame);
739 break;
740 }
741 Message::Frame(_) => {}
742 }
743 }
744 }
745 }
746
747 drop(session); write_task.abort();
750 command_task.abort();
751
752 if let Some(registry) = try_client_registry() {
754 registry.unregister(&client_id);
755 }
756
757 if let Err(e) = ctx.del(&client_entity) {
759 log::error!("Failed to delete client entity: {e}");
760 }
761
762 log::info!("Client disconnected: {}", client_id);
763
764 Ok(())
765 }
766
767 #[allow(clippy::too_many_arguments)]
769 fn handle_message<W: WsWriter>(
770 session: &mut ClientSession<W>,
771 ctx: Arc<CellServerCtx>,
772 priority_tx: &mpsc::Sender<MykoMessage>,
773 drop_logger: &Arc<DropLogger>,
774 query_ids_by_tx: &Arc<Mutex<HashMap<Arc<str>, Arc<str>>>>,
775 view_ids_by_tx: &Arc<Mutex<HashMap<Arc<str>, Arc<str>>>>,
776 subscribe_started_by_tx: &Arc<Mutex<HashMap<Arc<str>, Instant>>>,
777 command_started_by_tx: &Arc<Mutex<HashMap<Arc<str>, Instant>>>,
778 outbound_commands_by_tx: &Arc<Mutex<HashMap<String, (String, Instant)>>>,
779 command_tx: &mpsc::UnboundedSender<CommandJob>,
780 subscribe_tx: &mpsc::UnboundedSender<SubscriptionReady>,
781 msg: MykoMessage,
782 ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
783 let handler_registry = ctx.handler_registry.clone();
784
785 let registry = ctx.registry.clone();
786
787 let host_id = ctx.host_id;
788
789 match msg {
790 MykoMessage::Query(wrapped) => {
791 let tx_id: Arc<str> = wrapped
793 .query
794 .get("tx")
795 .and_then(|v| v.as_str())
796 .unwrap_or("unknown")
797 .into();
798 let query_id = &wrapped.query_id;
799 let entity_type = &wrapped.query_item_type;
800
801 if session.has_subscription(&tx_id) {
804 log::debug!(
805 "Ignoring duplicate query subscribe client={} tx={} query_id={} item_type={}",
806 session.client_id,
807 tx_id,
808 query_id,
809 entity_type
810 );
811 return Ok(());
812 }
813 if let Ok(mut map) = query_ids_by_tx.lock() {
814 map.insert(tx_id.clone(), query_id.clone());
815 }
816 if let Ok(mut map) = subscribe_started_by_tx.lock() {
817 map.entry(tx_id.clone()).or_insert_with(Instant::now);
818 }
819
820 log::trace!("Query {} for {} (tx: {})", query_id, entity_type, tx_id);
821 log::trace!(
822 "Query subscribe request client={} tx={} query_id={} item_type={} window={} active_subscriptions_before={}",
823 session.client_id,
824 tx_id,
825 query_id,
826 entity_type,
827 wrapped.window.is_some(),
828 session.subscription_count()
829 );
830
831 let request_context = Arc::new(RequestContext::from_client(
832 tx_id.clone(),
833 session.client_id.clone(),
834 host_id,
835 ));
836
837 if let Some(query_data) = handler_registry.get_query(query_id) {
838 let parsed = (query_data.parse)(wrapped.query.clone());
839 match parsed {
840 Ok(any_query) => {
841 let cell_factory = query_data.cell_factory;
844 let registry = registry.clone();
845 let request_context = request_context.clone();
846 let ctx = ctx.clone();
847 let window = wrapped.window.clone();
848 let query_id = query_id.clone();
849 let sub_tx = subscribe_tx.clone();
850 tokio::task::spawn_blocking(move || {
851 match cell_factory(any_query, registry, request_context, Some(ctx))
852 {
853 Ok(filtered_cellmap) => {
854 let _ = sub_tx.send(SubscriptionReady::Query {
855 tx_id,
856 cellmap: filtered_cellmap,
857 window,
858 });
859 }
860 Err(e) => {
861 log::error!(
862 "Failed to create query cell for {}: {}",
863 query_id,
864 e
865 );
866 }
867 }
868 });
869 }
870 Err(e) => {
871 log::error!(
872 "Failed to parse query {}: {} | payload: {}",
873 query_id,
874 e,
875 serde_json::to_string(&wrapped.query).unwrap_or_default()
876 );
877 }
878 }
879 } else {
880 log::warn!(
882 "No registered query handler for {}, falling back to select all",
883 query_id
884 );
885 let cellmap = registry.get_or_create(entity_type).select(|_| true);
886 session.subscribe_query(tx_id, cellmap, wrapped.window.clone());
887 }
888 }
889
890 MykoMessage::View(wrapped) => {
891 let tx_id: Arc<str> = wrapped
892 .view
893 .get("tx")
894 .and_then(|v| v.as_str())
895 .unwrap_or("unknown")
896 .into();
897 let view_id = &wrapped.view_id;
898 let item_type = &wrapped.view_item_type;
899
900 if session.has_subscription(&tx_id) {
902 log::debug!(
903 "Ignoring duplicate view subscribe client={} tx={} view_id={} item_type={}",
904 session.client_id,
905 tx_id,
906 view_id,
907 item_type
908 );
909 return Ok(());
910 }
911 if let Ok(mut map) = view_ids_by_tx.lock() {
912 map.insert(tx_id.clone(), view_id.clone());
913 }
914 if let Ok(mut map) = subscribe_started_by_tx.lock() {
915 map.entry(tx_id.clone()).or_insert_with(Instant::now);
916 }
917
918 log::trace!("View {} for {} (tx: {})", view_id, item_type, tx_id);
919 log::trace!(
920 "View subscribe request client={} tx={} view_id={} item_type={} window={:?}",
921 session.client_id,
922 tx_id,
923 view_id,
924 item_type,
925 wrapped.window
926 );
927
928 let request_context = Arc::new(RequestContext::from_client(
929 tx_id.clone(),
930 session.client_id.clone(),
931 host_id,
932 ));
933
934 if let Some(view_data) = handler_registry.get_view(view_id) {
935 let parsed = (view_data.parse)(wrapped.view.clone());
936 match parsed {
937 Ok(any_view) => {
938 log::trace!(
939 "View parsed successfully client={} tx={} view_id={}",
940 session.client_id,
941 tx_id,
942 view_id
943 );
944 let cell_factory = view_data.cell_factory;
947 let registry = registry.clone();
948 let ctx = ctx.clone();
949 let window = wrapped.window.clone();
950 let view_id_clone = view_id.clone();
951 let sub_tx = subscribe_tx.clone();
952 let priority_tx = priority_tx.clone();
953 let drop_logger = drop_logger.clone();
954 tokio::task::spawn_blocking(move || {
955 match cell_factory(any_view, registry, request_context, ctx) {
956 Ok(filtered_cellmap) => {
957 log::trace!(
958 "View cell factory succeeded tx={} view_id={}",
959 tx_id,
960 view_id_clone
961 );
962 let _ = sub_tx.send(SubscriptionReady::View {
963 tx_id,
964 view_id: view_id_clone,
965 cellmap: filtered_cellmap,
966 window,
967 });
968 }
969 Err(e) => {
970 log::error!(
971 "Failed to create view cell for {}: {}",
972 view_id_clone,
973 e
974 );
975 if let Err(err) = priority_tx.try_send(
976 MykoMessage::ViewError(ViewError {
977 tx: tx_id.to_string(),
978 view_id: view_id_clone.to_string(),
979 message: e,
980 }),
981 ) {
982 drop_logger.on_drop("ViewError", &err);
983 }
984 }
985 }
986 });
987 }
988 Err(e) => {
989 let message = format!("Failed to parse view {}: {}", view_id, e);
990 log::error!(
991 "{} | payload: {}",
992 message,
993 serde_json::to_string(&wrapped.view).unwrap_or_default()
994 );
995 if let Err(err) =
996 priority_tx.try_send(MykoMessage::ViewError(ViewError {
997 tx: tx_id.to_string(),
998 view_id: view_id.to_string(),
999 message,
1000 }))
1001 {
1002 drop_logger.on_drop("ViewError", &err);
1003 }
1004 }
1005 }
1006 } else {
1007 let message = format!("No registered handler for view: {}", view_id);
1008 log::warn!("{}", message);
1009 if let Err(err) = priority_tx.try_send(MykoMessage::ViewError(ViewError {
1010 tx: tx_id.to_string(),
1011 view_id: view_id.to_string(),
1012 message,
1013 })) {
1014 drop_logger.on_drop("ViewError", &err);
1015 }
1016 }
1017 }
1018
1019 MykoMessage::QueryCancel(CancelSubscription { tx: tx_id }) => {
1020 log::trace!(
1021 "QueryCancel received: client={} tx={}",
1022 session.client_id,
1023 tx_id
1024 );
1025 let tx_id: Arc<str> = tx_id.into();
1026 if let Ok(mut map) = query_ids_by_tx.lock() {
1027 map.remove(&tx_id);
1028 }
1029 if let Ok(mut map) = subscribe_started_by_tx.lock() {
1030 map.remove(&tx_id);
1031 }
1032 session.cancel(&tx_id);
1033 }
1034
1035 MykoMessage::QueryWindow(QueryWindowUpdate { tx, window }) => {
1036 let tx_id: Arc<str> = tx.into();
1037 log::trace!(
1038 "Query window request client={} tx={} has_window={} active_subscriptions={}",
1039 session.client_id,
1040 tx_id,
1041 window.is_some(),
1042 session.subscription_count()
1043 );
1044 session.update_query_window(&tx_id, window);
1045 }
1046 MykoMessage::ViewCancel(CancelSubscription { tx: tx_id }) => {
1047 log::trace!("View cancel: {}", tx_id);
1048 let tx_id: Arc<str> = tx_id.into();
1049 if let Ok(mut map) = view_ids_by_tx.lock() {
1050 map.remove(&tx_id);
1051 }
1052 if let Ok(mut map) = subscribe_started_by_tx.lock() {
1053 map.remove(&tx_id);
1054 }
1055 session.cancel(&tx_id);
1056 }
1057 MykoMessage::ViewWindow(ViewWindowUpdate { tx, window }) => {
1058 let tx_id: Arc<str> = tx.into();
1059 log::trace!("View window update: {}", tx_id);
1060 session.update_view_window(&tx_id, window);
1061 }
1062
1063 MykoMessage::Report(wrapped) => {
1064 let tx_id: Arc<str> = wrapped
1066 .report
1067 .get("tx")
1068 .and_then(|v| v.as_str())
1069 .unwrap_or("unknown")
1070 .into();
1071 let report_id = &wrapped.report_id;
1072
1073 log::trace!(
1074 "Report subscribe request client={} tx={} report_id={} active_subscriptions_before={}",
1075 session.client_id,
1076 tx_id,
1077 report_id,
1078 session.subscription_count()
1079 );
1080
1081 if let Some(report_data) = handler_registry.get_report(report_id) {
1083 let parsed = (report_data.parse)(wrapped.report.clone());
1085 match parsed {
1086 Ok(any_report) => {
1087 let request_context = Arc::new(RequestContext::from_client(
1088 tx_id.clone(),
1089 session.client_id.clone(),
1090 host_id,
1091 ));
1092
1093 match (report_data.cell_factory)(any_report, request_context, ctx) {
1095 Ok(cell) => {
1096 session.subscribe_report(
1097 tx_id,
1098 report_id.as_str().into(),
1099 cell,
1100 );
1101 }
1102 Err(e) => {
1103 log::error!(
1104 "Failed to create report cell for {}: {}",
1105 report_id,
1106 e
1107 );
1108 }
1109 }
1110 }
1111 Err(e) => {
1112 log::error!(
1113 "Failed to parse report {}: {} | payload: {}",
1114 report_id,
1115 e,
1116 serde_json::to_string(&wrapped.report).unwrap_or_default()
1117 );
1118 }
1119 }
1120 } else {
1121 log::warn!("No registered handler for report: {}", report_id);
1122 }
1123 }
1124
1125 MykoMessage::ReportCancel(CancelSubscription { tx: tx_id }) => {
1126 log::trace!(
1127 "ReportCancel received: client={} tx={} active_subscriptions_before={}",
1128 session.client_id,
1129 tx_id,
1130 session.subscription_count()
1131 );
1132 session.cancel(&tx_id.into());
1133 }
1134
1135 MykoMessage::Event(mut event) => {
1136 record_ws_ingest(std::slice::from_ref(&event));
1137 normalize_incoming_event(&mut event, &session.client_id);
1138 if let Err(e) = ctx.apply_event(event) {
1139 log::error!(
1140 "Failed to apply event from client {}: {e}",
1141 session.client_id
1142 );
1143 }
1144 }
1145
1146 MykoMessage::EventBatch(mut events) => {
1147 record_ws_ingest(&events);
1148 let incoming = events.len();
1149 if incoming >= 64 {
1150 log::trace!(
1151 "Received event batch from client {} size={}",
1152 session.client_id,
1153 incoming
1154 );
1155 }
1156 for event in &mut events {
1157 normalize_incoming_event(event, &session.client_id);
1158 }
1159 match ctx.apply_event_batch(events) {
1160 Ok(applied) => {
1161 log::trace!(
1162 "Applied event batch from client {} size={}",
1163 session.client_id,
1164 applied
1165 );
1166 }
1167 Err(e) => {
1168 log::error!(
1169 "Failed to apply event batch from client {}: {}",
1170 session.client_id,
1171 e
1172 );
1173 }
1174 }
1175 }
1176
1177 MykoMessage::Command(wrapped) => {
1178 let tx_id: Arc<str> = wrapped
1180 .command
1181 .get("tx")
1182 .and_then(|v| v.as_str())
1183 .unwrap_or("unknown")
1184 .into();
1185
1186 let command_id = &wrapped.command_id;
1187
1188 log::trace!("Command {} (tx: {})", command_id, tx_id,);
1189 let received_at = Instant::now();
1190 if let Ok(mut map) = command_started_by_tx.lock() {
1191 map.insert(tx_id.clone(), received_at);
1192 }
1193 if let Err(e) = command_tx.send(CommandJob {
1194 tx_id: tx_id.clone(),
1195 command_id: wrapped.command_id.clone(),
1196 command: wrapped.command.clone(),
1197 received_at,
1198 }) {
1199 log::error!(
1200 "Failed to enqueue command {} for client {} tx={}: {}",
1201 command_id,
1202 session.client_id,
1203 tx_id,
1204 e
1205 );
1206 let error = MykoMessage::CommandError(CommandError {
1207 tx: tx_id.to_string(),
1208 command_id: command_id.to_string(),
1209 message: "Command queue unavailable".to_string(),
1210 });
1211 if let Err(err) = priority_tx.try_send(error) {
1212 drop_logger.on_drop("CommandError", &err);
1213 }
1214 }
1215 }
1216
1217 MykoMessage::Ping(PingData { id, timestamp }) => {
1218 let pong = MykoMessage::Ping(PingData { id, timestamp });
1220 if let Err(e) = priority_tx.try_send(pong) {
1221 drop_logger.on_drop("Ping", &e);
1222 }
1223 }
1224
1225 MykoMessage::QueryResponse(resp) => {
1227 log::warn!(
1228 "Unexpected client message kind=query_response client={} tx={} seq={} upserts={} deletes={} active_subscriptions={}",
1229 session.client_id,
1230 resp.tx,
1231 resp.sequence,
1232 resp.upserts.len(),
1233 resp.deletes.len(),
1234 session.subscription_count()
1235 );
1236 }
1237 MykoMessage::QueryError(err) => {
1238 log::warn!(
1239 "Unexpected client message kind=query_error client={} tx={} query_id={} message={} active_subscriptions={}",
1240 session.client_id,
1241 err.tx,
1242 err.query_id,
1243 err.message,
1244 session.subscription_count()
1245 );
1246 }
1247 MykoMessage::ViewResponse(resp) => {
1248 log::warn!(
1249 "Unexpected client message kind=view_response client={} tx={} seq={} upserts={} deletes={} active_subscriptions={}",
1250 session.client_id,
1251 resp.tx,
1252 resp.sequence,
1253 resp.upserts.len(),
1254 resp.deletes.len(),
1255 session.subscription_count()
1256 );
1257 }
1258 MykoMessage::ViewError(err) => {
1259 log::warn!(
1260 "Unexpected client message kind=view_error client={} tx={} view_id={} message={} active_subscriptions={}",
1261 session.client_id,
1262 err.tx,
1263 err.view_id,
1264 err.message,
1265 session.subscription_count()
1266 );
1267 }
1268 MykoMessage::ReportResponse(resp) => {
1269 log::warn!(
1270 "Unexpected client message kind=report_response client={} tx={} active_subscriptions={}",
1271 session.client_id,
1272 resp.tx,
1273 session.subscription_count()
1274 );
1275 }
1276 MykoMessage::ReportError(err) => {
1277 log::warn!(
1278 "Unexpected client message kind=report_error client={} tx={} report_id={} message={} active_subscriptions={}",
1279 session.client_id,
1280 err.tx,
1281 err.report_id,
1282 err.message,
1283 session.subscription_count()
1284 );
1285 }
1286 MykoMessage::CommandResponse(resp) => {
1287 if resp.tx.trim().is_empty() {
1288 log::warn!(
1289 "Malformed client message kind=command_response client={} tx=<empty> active_subscriptions={}",
1290 session.client_id,
1291 session.subscription_count()
1292 );
1293 } else {
1294 let correlated = outbound_commands_by_tx
1295 .lock()
1296 .ok()
1297 .and_then(|mut map| map.remove(&resp.tx));
1298 if let Some((command_id, started)) = correlated {
1299 log::debug!(
1300 "Client command response matched outbound command client={} tx={} command_id={} roundtrip_ms={} active_subscriptions={}",
1301 session.client_id,
1302 resp.tx,
1303 command_id,
1304 started.elapsed().as_millis(),
1305 session.subscription_count()
1306 );
1307 } else {
1308 log::warn!(
1309 "Client command response without outbound match client={} tx={} active_subscriptions={}",
1310 session.client_id,
1311 resp.tx,
1312 session.subscription_count()
1313 );
1314 }
1315 }
1316 }
1317 MykoMessage::CommandError(err) => {
1318 if err.tx.trim().is_empty() {
1319 log::warn!(
1320 "Malformed client message kind=command_error client={} tx=<empty> command_id={} message={} active_subscriptions={}",
1321 session.client_id,
1322 err.command_id,
1323 err.message,
1324 session.subscription_count()
1325 );
1326 } else {
1327 let correlated = outbound_commands_by_tx
1328 .lock()
1329 .ok()
1330 .and_then(|mut map| map.remove(&err.tx));
1331 if let Some((command_id, started)) = correlated {
1332 log::warn!(
1333 "Client command error matched outbound command client={} tx={} command_id={} transport_command_id={} message={} roundtrip_ms={} active_subscriptions={}",
1334 session.client_id,
1335 err.tx,
1336 err.command_id,
1337 command_id,
1338 err.message,
1339 started.elapsed().as_millis(),
1340 session.subscription_count()
1341 );
1342 } else {
1343 log::warn!(
1344 "Client command error without outbound match client={} tx={} command_id={} message={} active_subscriptions={}",
1345 session.client_id,
1346 err.tx,
1347 err.command_id,
1348 err.message,
1349 session.subscription_count()
1350 );
1351 }
1352 }
1353 }
1354 MykoMessage::Benchmark(payload) => {
1355 let stats = ws_benchmark_stats();
1356 ensure_ws_benchmark_logger();
1357 stats.message_count.fetch_add(1, Ordering::Relaxed);
1358 let size = payload.to_string().len() as u64;
1360 stats.total_bytes.fetch_add(size, Ordering::Relaxed);
1361 }
1362 MykoMessage::ProtocolSwitch { protocol } => {
1363 log::warn!(
1364 "Unexpected client message kind=protocol_switch_ack client={} protocol={} active_subscriptions={}",
1365 session.client_id,
1366 protocol,
1367 session.subscription_count()
1368 );
1369 }
1370 }
1371
1372 Ok(())
1373 }
1374
1375 fn execute_command_job(
1376 ctx: Arc<CellServerCtx>,
1377 priority_tx: &mpsc::Sender<MykoMessage>,
1378 drop_logger: &DropLogger,
1379 client_id: Arc<str>,
1380 job: CommandJob,
1381 ) {
1382 let host_id = ctx.host_id;
1383 let started = Instant::now();
1384 let queue_wait_ms = started.duration_since(job.received_at).as_millis();
1385 let command_id = job.command_id.clone();
1386
1387 let mut handler_found = false;
1388 for registration in inventory::iter::<CommandHandlerRegistration> {
1389 if registration.command_id == command_id {
1390 handler_found = true;
1391 let executor = (registration.factory)();
1392
1393 let req = Arc::new(RequestContext::from_client(
1394 job.tx_id.clone(),
1395 client_id.clone(),
1396 host_id,
1397 ));
1398 let cmd_id: Arc<str> = Arc::from(command_id.clone());
1399 let cmd_ctx = CommandContext::new(cmd_id, req, ctx.clone());
1400 let execute_started = Instant::now();
1401
1402 match executor.execute_from_value(job.command.clone(), cmd_ctx) {
1403 Ok(result) => {
1404 let response = MykoMessage::CommandResponse(CommandResponse {
1405 response: result,
1406 tx: job.tx_id.to_string(),
1407 });
1408 if let Err(e) = priority_tx.try_send(response) {
1409 drop_logger.on_drop("CommandResponse", &e);
1410 }
1411 }
1412 Err(e) => {
1413 let error = MykoMessage::CommandError(CommandError {
1414 tx: job.tx_id.to_string(),
1415 command_id: command_id.clone(),
1416 message: e.message,
1417 });
1418 if let Err(err) = priority_tx.try_send(error) {
1419 drop_logger.on_drop("CommandError", &err);
1420 }
1421 }
1422 }
1423 let execute_ms = execute_started.elapsed().as_millis();
1424 let total_ms = job.received_at.elapsed().as_millis();
1425 log::trace!(
1426 target: "myko_server::ws_perf",
1427 "command_exec client={} tx={} command_id={} queue_wait_ms={} execute_ms={} total_ms={}",
1428 client_id,
1429 job.tx_id,
1430 command_id,
1431 queue_wait_ms,
1432 execute_ms,
1433 total_ms
1434 );
1435 break;
1436 }
1437 }
1438
1439 if !handler_found {
1440 log::warn!("No registered handler for command: {}", command_id);
1441 let error = MykoMessage::CommandError(CommandError {
1442 tx: job.tx_id.to_string(),
1443 command_id: command_id.clone(),
1444 message: format!("Command handler not found: {}", command_id),
1445 });
1446 if let Err(e) = priority_tx.try_send(error) {
1447 drop_logger.on_drop("CommandError", &e);
1448 }
1449 }
1450
1451 if !handler_found {
1452 log::debug!(
1453 target: "myko_server::ws_perf",
1454 "command_exec client={} tx={} command_id={} queue_wait_ms={} execute_ms=0 total_ms={} handler_found=false",
1455 client_id,
1456 job.tx_id,
1457 command_id,
1458 queue_wait_ms,
1459 job.received_at.elapsed().as_millis()
1460 );
1461 }
1462 }
1463}
1464
1465struct ChannelWriter {
1470 tx: mpsc::Sender<OutboundMessage>,
1471 deferred_tx: mpsc::Sender<DeferredOutbound>,
1472 drop_logger: Arc<DropLogger>,
1473 use_binary_writer: Arc<AtomicBool>,
1474}
1475
1476impl WsWriter for ChannelWriter {
1477 fn send(&self, msg: MykoMessage) {
1478 if let Err(e) = self.tx.try_send(OutboundMessage::Message(msg)) {
1480 self.drop_logger.on_drop("message", &e);
1481 }
1482 }
1483
1484 fn protocol(&self) -> myko::client::MykoProtocol {
1485 if self.use_binary_writer.load(Ordering::SeqCst) {
1486 myko::client::MykoProtocol::MSGPACK
1487 } else {
1488 myko::client::MykoProtocol::JSON
1489 }
1490 }
1491
1492 fn send_serialized_command(
1493 &self,
1494 tx: Arc<str>,
1495 command_id: String,
1496 payload: EncodedCommandMessage,
1497 ) {
1498 if let Err(e) = self.tx.try_send(OutboundMessage::SerializedCommand {
1499 tx,
1500 command_id,
1501 payload,
1502 }) {
1503 self.drop_logger.on_drop("serialized_command", &e);
1504 }
1505 }
1506
1507 fn send_report_response(&self, tx: Arc<str>, output: Arc<dyn AnyOutput>) {
1508 if let Err(e) = self
1509 .deferred_tx
1510 .try_send(DeferredOutbound::Report(tx, output))
1511 {
1512 self.drop_logger.on_drop("ReportResponseDeferred", &e);
1513 }
1514 }
1515
1516 fn send_query_response(&self, response: PendingQueryResponse, is_view: bool) {
1517 if let Err(e) = self
1518 .deferred_tx
1519 .try_send(DeferredOutbound::Query { response, is_view })
1520 {
1521 self.drop_logger.on_drop("QueryResponseDeferred", &e);
1522 }
1523 }
1524}
1525
1526#[cfg(test)]
1527mod tests {
1528 use super::*;
1529
1530 #[test]
1531 fn test_channel_writer() {
1532 let (tx, mut rx) = mpsc::channel(10);
1533 let (deferred_tx, _deferred_rx) = mpsc::channel(10);
1534 let drop_logger = Arc::new(DropLogger::new("test-client".into()));
1535 let writer = ChannelWriter {
1536 tx,
1537 deferred_tx,
1538 drop_logger,
1539 use_binary_writer: Arc::new(AtomicBool::new(false)),
1540 };
1541
1542 let msg = MykoMessage::Ping(PingData {
1543 id: "test".to_string(),
1544 timestamp: 0,
1545 });
1546 writer.send(msg);
1547
1548 let received = rx.try_recv().unwrap();
1549 assert!(matches!(
1550 received,
1551 OutboundMessage::Message(MykoMessage::Ping(_))
1552 ));
1553 }
1554}