1use std::{
6 collections::HashMap,
7 net::SocketAddr,
8 sync::{
9 Arc, Mutex, OnceLock,
10 atomic::{AtomicBool, AtomicU64, Ordering},
11 },
12 thread,
13 time::{Duration, Instant},
14};
15
16use dashmap::DashMap;
17use futures_util::{SinkExt, StreamExt};
18use hyphae::SelectExt;
19use myko::{
20 WS_MAX_FRAME_SIZE_BYTES, WS_MAX_MESSAGE_SIZE_BYTES,
21 command::{CommandContext, CommandHandlerRegistration},
22 entities::client::{Client, ClientId},
23 relationship::{iter_client_id_registrations, iter_fallback_to_id_registrations},
24 report::AnyOutput,
25 request::RequestContext,
26 server::{
27 CellServerCtx, ClientSession, PendingQueryResponse, WsWriter,
28 client_registry::try_client_registry,
29 },
30 wire::{
31 CancelSubscription, CommandError, CommandResponse, EncodedCommandMessage, MEvent,
32 MEventType, MykoMessage, PingData, QueryWindowUpdate, ViewError, ViewWindowUpdate,
33 },
34};
35use tokio::{net::TcpStream, sync::mpsc, time::interval};
36use tokio_tungstenite::{
37 accept_async_with_config,
38 tungstenite::{Message, protocol::WebSocketConfig},
39};
40use uuid::Uuid;
41
42const SWITCH_TO_MSGPACK: &str = "myko:switch-to-msgpack";
45
46struct WsIngestStats {
47 counts_by_type: DashMap<Arc<str>, Arc<AtomicU64>>,
48}
49
50struct WsBenchmarkStats {
51 message_count: AtomicU64,
52 total_bytes: AtomicU64,
53}
54
55static WS_INGEST_STATS: OnceLock<Arc<WsIngestStats>> = OnceLock::new();
56static WS_INGEST_LOGGER_STARTED: AtomicBool = AtomicBool::new(false);
57static WS_BENCHMARK_STATS: OnceLock<Arc<WsBenchmarkStats>> = OnceLock::new();
58static WS_BENCHMARK_LOGGER_STARTED: AtomicBool = AtomicBool::new(false);
59
60fn ws_benchmark_stats() -> Arc<WsBenchmarkStats> {
61 WS_BENCHMARK_STATS
62 .get_or_init(|| {
63 Arc::new(WsBenchmarkStats {
64 message_count: AtomicU64::new(0),
65 total_bytes: AtomicU64::new(0),
66 })
67 })
68 .clone()
69}
70
71fn ensure_ws_benchmark_logger() {
72 if WS_BENCHMARK_LOGGER_STARTED
73 .compare_exchange(false, true, Ordering::Relaxed, Ordering::Relaxed)
74 .is_err()
75 {
76 return;
77 }
78
79 let stats = ws_benchmark_stats();
80 thread::Builder::new()
81 .name("ws-benchmark-logger".into())
82 .spawn(move || {
83 loop {
84 thread::sleep(Duration::from_secs(1));
85
86 let count = stats.message_count.swap(0, Ordering::Relaxed);
87 let bytes = stats.total_bytes.swap(0, Ordering::Relaxed);
88
89 if count == 0 {
90 continue;
91 }
92
93 log::info!(
94 "WebSocket benchmark last_1s messages={} bytes={} avg_bytes={}",
95 count,
96 bytes,
97 if count > 0 { bytes / count } else { 0 },
98 );
99 }
100 })
101 .expect("failed to spawn websocket benchmark logger thread");
102}
103
104fn ws_ingest_stats() -> Arc<WsIngestStats> {
105 WS_INGEST_STATS
106 .get_or_init(|| {
107 Arc::new(WsIngestStats {
108 counts_by_type: DashMap::new(),
109 })
110 })
111 .clone()
112}
113
114fn ensure_ws_ingest_logger() {
115 if WS_INGEST_LOGGER_STARTED
116 .compare_exchange(false, true, Ordering::Relaxed, Ordering::Relaxed)
117 .is_err()
118 {
119 return;
120 }
121
122 let stats = ws_ingest_stats();
123 thread::Builder::new()
124 .name("ws-ingest-logger".into())
125 .spawn(move || {
126 loop {
127 thread::sleep(Duration::from_secs(1));
128
129 let mut per_type = Vec::new();
130 let mut total = 0u64;
131 for entry in &stats.counts_by_type {
132 let count = entry.value().swap(0, Ordering::Relaxed);
133 if count == 0 {
134 continue;
135 }
136 total += count;
137 per_type.push((entry.key().clone(), count));
138 }
139
140 if total == 0 {
141 continue;
142 }
143
144 per_type.sort_by(|a, b| b.1.cmp(&a.1).then_with(|| a.0.cmp(&b.0)));
145 let details = per_type
146 .into_iter()
147 .map(|(entity_type, count)| format!("{entity_type}={count}"))
148 .collect::<Vec<_>>()
149 .join(" ");
150
151 log::info!("WebSocket ingest last_1s total={} {}", total, details);
152 }
153 })
154 .expect("failed to spawn websocket ingest logger thread");
155}
156
157fn record_ws_ingest(events: &[MEvent]) {
158 if events.is_empty() {
159 return;
160 }
161
162 let stats = ws_ingest_stats();
163 for event in events {
164 let counter = stats
165 .counts_by_type
166 .entry(Arc::<str>::from(event.item_type.as_str()))
167 .or_insert_with(|| Arc::new(AtomicU64::new(0)))
168 .clone();
169 counter.fetch_add(1, Ordering::Relaxed);
170 }
171}
172
173fn normalize_incoming_event(event: &mut MEvent, client_id: &str) {
174 if event.change_type != MEventType::SET {
175 return;
176 }
177
178 for reg in iter_client_id_registrations() {
180 if reg.entity_type == event.item_type {
181 if let Some(obj) = event.item.as_object_mut() {
182 obj.insert(
183 reg.field_name_json.to_string(),
184 serde_json::Value::String(client_id.to_string()),
185 );
186 }
187 break;
188 }
189 }
190
191 if let Some(obj) = event.item.as_object_mut()
194 && let Some(id) = obj.get("id").and_then(|v| v.as_str()).map(String::from)
195 {
196 for reg in iter_fallback_to_id_registrations() {
197 if reg.entity_type == event.item_type {
198 let field = reg.field_name_json;
199 if matches!(obj.get(field), None | Some(serde_json::Value::Null)) {
200 obj.insert(field.to_string(), serde_json::Value::String(id.clone()));
201 }
202 }
203 }
204 }
205}
206
207struct DropLogger {
212 client_id: Arc<str>,
213 dropped: std::sync::atomic::AtomicU64,
214 last_log_ms: std::sync::atomic::AtomicU64,
215}
216
217impl DropLogger {
218 fn new(client_id: Arc<str>) -> Self {
219 Self {
220 client_id,
221 dropped: std::sync::atomic::AtomicU64::new(0),
222 last_log_ms: std::sync::atomic::AtomicU64::new(0),
223 }
224 }
225
226 fn on_drop(&self, kind: &'static str, err: &dyn std::fmt::Display) {
227 use std::sync::atomic::Ordering;
228
229 self.dropped.fetch_add(1, Ordering::Relaxed);
230
231 let now_ms = std::time::SystemTime::now()
233 .duration_since(std::time::UNIX_EPOCH)
234 .unwrap_or_default()
235 .as_millis() as u64;
236 let last_ms = self.last_log_ms.load(Ordering::Relaxed);
237 if now_ms.saturating_sub(last_ms) < 1000 {
238 return;
239 }
240
241 if self
242 .last_log_ms
243 .compare_exchange(last_ms, now_ms, Ordering::Relaxed, Ordering::Relaxed)
244 .is_err()
245 {
246 return;
247 }
248
249 let n = self.dropped.swap(0, Ordering::Relaxed);
250 log::warn!(
251 "WebSocket send buffer full; dropped {} message(s) for client {} (latest: {}): {}",
252 n,
253 self.client_id,
254 kind,
255 err
256 );
257 }
258}
259
260struct CommandJob {
261 tx_id: Arc<str>,
262 command_id: String,
263 command: serde_json::Value,
264 received_at: Instant,
265}
266
267enum SubscriptionReady {
269 Query {
270 tx_id: Arc<str>,
271 cellmap: hyphae::CellMap<Arc<str>, Arc<dyn myko::item::AnyItem>, hyphae::CellImmutable>,
272 window: Option<myko::wire::QueryWindow>,
273 },
274 View {
275 tx_id: Arc<str>,
276 view_id: Arc<str>,
277 cellmap: hyphae::CellMap<Arc<str>, Arc<dyn myko::item::AnyItem>, hyphae::CellImmutable>,
278 window: Option<myko::wire::QueryWindow>,
279 },
280}
281
282enum OutboundMessage {
283 Message(MykoMessage),
284 SerializedCommand {
285 tx: Arc<str>,
286 command_id: String,
287 payload: EncodedCommandMessage,
288 },
289}
290
291enum DeferredOutbound {
292 Report(Arc<str>, Arc<dyn AnyOutput>),
293 Query {
294 response: PendingQueryResponse,
295 is_view: bool,
296 },
297}
298
299pub struct WsHandler;
301
302impl WsHandler {
303 #[allow(clippy::too_many_arguments)]
305 pub async fn handle_connection(
306 stream: TcpStream,
307 addr: SocketAddr,
308 ctx: Arc<CellServerCtx>,
309 ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
310 ensure_ws_ingest_logger();
311
312 let host_id = ctx.host_id;
313
314 let mut ws_config = WebSocketConfig::default();
315 ws_config.max_message_size = Some(WS_MAX_MESSAGE_SIZE_BYTES);
316 ws_config.max_frame_size = Some(WS_MAX_FRAME_SIZE_BYTES);
317 let ws_stream = accept_async_with_config(stream, Some(ws_config)).await?;
318 let (mut write, mut read) = ws_stream.split();
319
320 let (tx, mut rx) = mpsc::channel::<OutboundMessage>(10_000);
323 let (deferred_tx, mut deferred_rx) = mpsc::channel::<DeferredOutbound>(10_000);
324 let (priority_tx, mut priority_rx) = mpsc::channel::<MykoMessage>(1_000);
325 let (command_tx, mut command_rx) = mpsc::unbounded_channel::<CommandJob>();
326 let (subscribe_tx, mut subscribe_rx) = mpsc::unbounded_channel::<SubscriptionReady>();
327
328 let use_binary = Arc::new(AtomicBool::new(false));
330
331 let client_id: Arc<str> = Uuid::new_v4().to_string().into();
333 let drop_logger = Arc::new(DropLogger::new(client_id.clone()));
334 let writer = ChannelWriter {
335 tx: tx.clone(),
336 deferred_tx: deferred_tx.clone(),
337 drop_logger: drop_logger.clone(),
338 use_binary_writer: use_binary.clone(),
339 };
340
341 let writer_arc: Arc<dyn WsWriter> = Arc::new(ChannelWriter {
343 tx: tx.clone(),
344 deferred_tx: deferred_tx.clone(),
345 drop_logger: drop_logger.clone(),
346 use_binary_writer: use_binary.clone(),
347 });
348 if let Some(registry) = try_client_registry() {
349 registry.register(client_id.clone(), writer_arc);
350 }
351
352 let mut session = ClientSession::new(client_id.clone(), writer);
353
354 let use_binary_writer = use_binary.clone();
355 let query_ids_by_tx: Arc<Mutex<HashMap<Arc<str>, Arc<str>>>> =
356 Arc::new(Mutex::new(HashMap::new()));
357 let view_ids_by_tx: Arc<Mutex<HashMap<Arc<str>, Arc<str>>>> =
358 Arc::new(Mutex::new(HashMap::new()));
359 let subscribe_started_by_tx: Arc<Mutex<HashMap<Arc<str>, Instant>>> =
360 Arc::new(Mutex::new(HashMap::new()));
361 let command_started_by_tx: Arc<Mutex<HashMap<Arc<str>, Instant>>> =
362 Arc::new(Mutex::new(HashMap::new()));
363 let outbound_commands_by_tx: Arc<Mutex<HashMap<String, (String, Instant)>>> =
364 Arc::new(Mutex::new(HashMap::new()));
365
366 let outbound_commands_by_tx_writer = outbound_commands_by_tx.clone();
367
368 let client_entity = Client {
370 id: ClientId(client_id.clone()),
371 server_id: host_id.to_string().into(),
372 address: Some(Arc::from(addr.to_string())),
373 windback: None,
374 };
375 if let Err(e) = ctx.set(&client_entity) {
376 log::error!("Failed to persist client entity: {e}");
377 }
378
379 log::info!("Client connected: {} from {}", client_id, addr);
380
381 let write_ctx = ctx.clone();
382 let write_client_id = client_id.clone();
383 let write_addr = addr;
384 let command_ctx = ctx.clone();
385 let command_priority_tx = priority_tx.clone();
386 let command_drop_logger = drop_logger.clone();
387 let command_client_id = client_id.clone();
388
389 let write_task = tokio::spawn(async move {
391 let _ctx = write_ctx;
392 let mut normal_open = true;
393 let mut priority_open = true;
394 let mut deferred_open = true;
395 while normal_open || priority_open || deferred_open {
396 let msg = tokio::select! {
397 biased;
398 maybe = priority_rx.recv(), if priority_open => {
399 match maybe {
400 Some(msg) => OutboundMessage::Message(msg),
401 None => {
402 priority_open = false;
403 continue;
404 }
405 }
406 }
407 maybe = deferred_rx.recv(), if deferred_open => {
408 match maybe {
409 Some(DeferredOutbound::Report(tx, output)) => {
410 OutboundMessage::Message(MykoMessage::ReportResponse(myko::wire::ReportResponse {
411 response: output.to_value(),
412 tx: tx.to_string(),
413 }))
414 }
415 Some(DeferredOutbound::Query { response, is_view }) => {
416 if is_view {
417 OutboundMessage::Message(MykoMessage::ViewResponse(response.into_wire()))
418 } else {
419 OutboundMessage::Message(MykoMessage::QueryResponse(response.into_wire()))
420 }
421 }
422 None => {
423 deferred_open = false;
424 continue;
425 }
426 }
427 }
428 maybe = rx.recv(), if normal_open => {
429 match maybe {
430 Some(msg) => msg,
431 None => {
432 normal_open = false;
433 continue;
434 }
435 }
436 }
437 };
438 let (kind, tx_id, seq, _upserts, _deletes, _total_count) = match &msg {
439 OutboundMessage::SerializedCommand { tx, .. } => {
440 ("command", Some(tx.clone()), None, None, None, None)
441 }
442 OutboundMessage::Message(msg) => match msg {
443 MykoMessage::ViewResponse(r) => (
444 "view_response",
445 Some(r.tx.clone()),
446 Some(r.sequence),
447 Some(r.upserts.len()),
448 Some(r.deletes.len()),
449 r.total_count,
450 ),
451 MykoMessage::QueryResponse(r) => (
452 "query_response",
453 Some(r.tx.clone()),
454 Some(r.sequence),
455 Some(r.upserts.len()),
456 Some(r.deletes.len()),
457 r.total_count,
458 ),
459 MykoMessage::CommandResponse(r) => (
460 "command_response",
461 Some(Arc::<str>::from(r.tx.clone())),
462 None,
463 None,
464 None,
465 None,
466 ),
467 MykoMessage::CommandError(r) => (
468 "command_error",
469 Some(Arc::<str>::from(r.tx.clone())),
470 None,
471 None,
472 None,
473 None,
474 ),
475 _ => ("other", None, None, None, None, None),
476 },
477 };
478
479 match &msg {
480 OutboundMessage::SerializedCommand { tx, command_id, .. } => {
481 if !tx.trim().is_empty()
482 && let Ok(mut map) = outbound_commands_by_tx_writer.lock()
483 {
484 map.insert(tx.to_string(), (command_id.clone(), Instant::now()));
485 }
486 }
487 OutboundMessage::Message(MykoMessage::Command(wrapped)) => {
488 if let Some(tx_id) = wrapped.command.get("tx").and_then(|v| v.as_str())
489 && !tx_id.trim().is_empty()
490 && let Ok(mut map) = outbound_commands_by_tx_writer.lock()
491 {
492 map.insert(
493 tx_id.to_string(),
494 (wrapped.command_id.clone(), Instant::now()),
495 );
496 }
497 }
498 _ => {}
499 }
500
501 let ws_msg = match &msg {
502 OutboundMessage::SerializedCommand {
503 payload: EncodedCommandMessage::Json(json),
504 ..
505 } => Message::Text(json.clone().into()),
506 OutboundMessage::SerializedCommand {
507 payload: EncodedCommandMessage::Msgpack(bytes),
508 ..
509 } => Message::Binary(bytes.clone().into()),
510 OutboundMessage::Message(msg) if use_binary_writer.load(Ordering::SeqCst) => {
511 match rmp_serde::to_vec(msg) {
512 Ok(bytes) => Message::Binary(bytes.into()),
513 Err(e) => {
514 log::error!("Failed to serialize message to msgpack: {}", e);
515 continue;
516 }
517 }
518 }
519 OutboundMessage::Message(msg) => match serde_json::to_string(msg) {
520 Ok(json) => Message::Text(json.into()),
521 Err(e) => {
522 log::error!("Failed to serialize message to JSON: {}", e);
523 continue;
524 }
525 },
526 };
527 let payload_bytes = match &ws_msg {
528 Message::Binary(b) => b.len(),
529 Message::Text(t) => t.len(),
530 _ => 0,
531 };
532
533 if let Err(err) = write.send(ws_msg).await {
534 log::error!(
535 "WebSocket write failed for client {} from {} kind={} tx={:?} seq={:?} payload_bytes={} binary={}: {}",
536 write_client_id,
537 write_addr,
538 kind,
539 tx_id,
540 seq,
541 payload_bytes,
542 use_binary_writer.load(Ordering::SeqCst),
543 err
544 );
545 break;
546 }
547 }
548 if let Some(registry) = try_client_registry() {
551 registry.unregister(&write_client_id);
552 log::info!(
553 "WebSocket writer unregistered client {} from {} (write task exiting)",
554 write_client_id,
555 write_addr,
556 );
557 }
558 log::warn!(
559 "WebSocket writer task exiting for client {} from {} normal_open={} priority_open={} deferred_open={}",
560 write_client_id,
561 write_addr,
562 normal_open,
563 priority_open,
564 deferred_open
565 );
566 });
567
568 let command_started_cleanup = command_started_by_tx.clone();
571 let command_task = tokio::spawn(async move {
572 while let Some(job) = command_rx.recv().await {
573 let command_ctx = command_ctx.clone();
574 let command_priority_tx = command_priority_tx.clone();
575 let command_drop_logger = command_drop_logger.clone();
576 let command_client_id = command_client_id.clone();
577 let tx_id = job.tx_id.clone();
578 let started_map = command_started_cleanup.clone();
579 match tokio::task::spawn_blocking(move || {
580 Self::execute_command_job(
581 command_ctx,
582 &command_priority_tx,
583 command_drop_logger.as_ref(),
584 command_client_id,
585 job,
586 );
587 })
588 .await
589 {
590 Ok(()) => {}
591 Err(e) => {
592 log::error!("Command worker panicked: {}", e);
593 }
594 }
595 if let Ok(mut map) = started_map.lock() {
597 map.remove(&tx_id);
598 }
599 }
600 });
601
602 let mut outbound_ttl_interval = interval(Duration::from_secs(10));
606 outbound_ttl_interval.tick().await; loop {
608 tokio::select! {
609 Some(ready) = subscribe_rx.recv() => {
611 let tx_id = match &ready {
612 SubscriptionReady::Query { tx_id, .. } => tx_id.clone(),
613 SubscriptionReady::View { tx_id, .. } => tx_id.clone(),
614 };
615 if let Ok(mut map) = subscribe_started_by_tx.lock() {
616 map.remove(&tx_id);
617 }
618 match ready {
619 SubscriptionReady::Query { tx_id, cellmap, window } => {
620 session.subscribe_query(tx_id, cellmap, window);
621 }
622 SubscriptionReady::View { tx_id, view_id, cellmap, window } => {
623 session.subscribe_view_with_id(tx_id, view_id, cellmap, window);
624 }
625 }
626 }
627 _ = outbound_ttl_interval.tick() => {
631 if let Ok(mut map) = outbound_commands_by_tx.lock() {
632 let before = map.len();
633 map.retain(|_, (_, started)| started.elapsed() < Duration::from_secs(10));
634 let removed = before - map.len();
635 if removed > 0 {
636 log::debug!(
637 "Outbound command TTL sweep client={}: removed {} stale entries, {} remaining",
638 session.client_id,
639 removed,
640 map.len()
641 );
642 }
643 }
644 }
645 msg = read.next() => {
647 let Some(msg) = msg else { break };
648 let ctx = ctx.clone();
649 let msg = match msg {
650 Ok(m) => m,
651 Err(e) => {
652 log::error!("WebSocket read error from {}: {}", client_id, e);
653 break;
654 }
655 };
656
657 match msg {
658 Message::Binary(data) => {
659 if !use_binary.load(Ordering::SeqCst) {
660 log::debug!(
661 "Client {} switched to binary (msgpack) protocol via auto-detect",
662 client_id
663 );
664 use_binary.store(true, Ordering::SeqCst);
665 }
666
667 match rmp_serde::from_slice::<MykoMessage>(&data) {
668 Ok(myko_msg) => {
669 if let Err(e) = Self::handle_message(
670 &mut session,
671 ctx,
672 &priority_tx,
673 &drop_logger,
674 &query_ids_by_tx,
675 &view_ids_by_tx,
676 &subscribe_started_by_tx,
677 &command_started_by_tx,
678 &outbound_commands_by_tx,
679 &command_tx,
680 &subscribe_tx,
681 myko_msg,
682 ) {
683 log::error!("Error handling message: {}", e);
684 }
685 tokio::task::yield_now().await;
686 }
687 Err(e) => {
688 log::warn!("Failed to parse message from {}: {}", client_id, e);
689 }
690 }
691 }
692 Message::Text(text) => {
693 if text == SWITCH_TO_MSGPACK {
694 log::debug!(
695 "Client {} switched to binary (msgpack) protocol via explicit request",
696 client_id
697 );
698 if let Err(e) = priority_tx.try_send(MykoMessage::ProtocolSwitch {
699 protocol: "msgpack".into(),
700 }) {
701 drop_logger.on_drop("ProtocolSwitch", &e);
702 }
703 use_binary.store(true, Ordering::SeqCst);
704 continue;
705 }
706
707 match serde_json::from_str::<MykoMessage>(&text) {
708 Ok(myko_msg) => {
709 if let Err(e) = Self::handle_message(
710 &mut session,
711 ctx,
712 &priority_tx,
713 &drop_logger,
714 &query_ids_by_tx,
715 &view_ids_by_tx,
716 &subscribe_started_by_tx,
717 &command_started_by_tx,
718 &outbound_commands_by_tx,
719 &command_tx,
720 &subscribe_tx,
721 myko_msg,
722 ) {
723 log::error!("Error handling message: {}", e);
724 }
725 tokio::task::yield_now().await;
726 }
727 Err(e) => {
728 log::warn!(
729 "Failed to parse JSON message from {}: {} | raw: {}",
730 client_id,
731 e,
732 if text.len() > 1000 {
733 &text[..1000]
734 } else {
735 &text
736 }
737 );
738 }
739 }
740 }
741 Message::Ping(data) => {
742 log::trace!("Ping from {}", client_id);
743 let _ = data;
744 }
745 Message::Pong(_) => {
746 log::trace!("Pong from {}", client_id);
747 }
748 Message::Close(frame) => {
749 log::warn!("Client {} sent close frame: {:?}", client_id, frame);
750 break;
751 }
752 Message::Frame(_) => {}
753 }
754 }
755 }
756 }
757
758 drop(session); write_task.abort();
761 command_task.abort();
762
763 if let Some(registry) = try_client_registry() {
765 registry.unregister(&client_id);
766 }
767
768 if let Err(e) = ctx.del(&client_entity) {
770 log::error!("Failed to delete client entity: {e}");
771 }
772
773 log::info!("Client disconnected: {} from {}", client_id, addr);
774
775 Ok(())
776 }
777
778 #[allow(clippy::too_many_arguments)]
780 fn handle_message<W: WsWriter>(
781 session: &mut ClientSession<W>,
782 ctx: Arc<CellServerCtx>,
783 priority_tx: &mpsc::Sender<MykoMessage>,
784 drop_logger: &Arc<DropLogger>,
785 query_ids_by_tx: &Arc<Mutex<HashMap<Arc<str>, Arc<str>>>>,
786 view_ids_by_tx: &Arc<Mutex<HashMap<Arc<str>, Arc<str>>>>,
787 subscribe_started_by_tx: &Arc<Mutex<HashMap<Arc<str>, Instant>>>,
788 command_started_by_tx: &Arc<Mutex<HashMap<Arc<str>, Instant>>>,
789 outbound_commands_by_tx: &Arc<Mutex<HashMap<String, (String, Instant)>>>,
790 command_tx: &mpsc::UnboundedSender<CommandJob>,
791 subscribe_tx: &mpsc::UnboundedSender<SubscriptionReady>,
792 msg: MykoMessage,
793 ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
794 let handler_registry = ctx.handler_registry.clone();
795
796 let registry = ctx.registry.clone();
797
798 let host_id = ctx.host_id;
799
800 match msg {
801 MykoMessage::Query(wrapped) => {
802 let tx_id: Arc<str> = wrapped
804 .query
805 .get("tx")
806 .and_then(|v| v.as_str())
807 .unwrap_or("unknown")
808 .into();
809 let query_id = &wrapped.query_id;
810 let entity_type = &wrapped.query_item_type;
811
812 if session.has_subscription(&tx_id) {
815 log::debug!(
816 "Ignoring duplicate query subscribe client={} tx={} query_id={} item_type={}",
817 session.client_id,
818 tx_id,
819 query_id,
820 entity_type
821 );
822 return Ok(());
823 }
824 if let Ok(mut map) = query_ids_by_tx.lock() {
825 map.insert(tx_id.clone(), query_id.clone());
826 }
827 if let Ok(mut map) = subscribe_started_by_tx.lock() {
828 map.entry(tx_id.clone()).or_insert_with(Instant::now);
829 }
830
831 log::trace!("Query {} for {} (tx: {})", query_id, entity_type, tx_id);
832 log::trace!(
833 "Query subscribe request client={} tx={} query_id={} item_type={} window={} active_subscriptions_before={}",
834 session.client_id,
835 tx_id,
836 query_id,
837 entity_type,
838 wrapped.window.is_some(),
839 session.subscription_count()
840 );
841
842 let request_context = Arc::new(RequestContext::from_client(
843 tx_id.clone(),
844 session.client_id.clone(),
845 host_id,
846 ));
847
848 if let Some(query_data) = handler_registry.get_query(query_id) {
849 let parsed = (query_data.parse)(wrapped.query.clone());
850 match parsed {
851 Ok(any_query) => {
852 let cell_factory = query_data.cell_factory;
855 let registry = registry.clone();
856 let request_context = request_context.clone();
857 let ctx = ctx.clone();
858 let window = wrapped.window.clone();
859 let query_id = query_id.clone();
860 let sub_tx = subscribe_tx.clone();
861 tokio::task::spawn_blocking(move || {
862 match cell_factory(any_query, registry, request_context, Some(ctx))
863 {
864 Ok(filtered_cellmap) => {
865 let _ = sub_tx.send(SubscriptionReady::Query {
866 tx_id,
867 cellmap: filtered_cellmap,
868 window,
869 });
870 }
871 Err(e) => {
872 log::error!(
873 "Failed to create query cell for {}: {}",
874 query_id,
875 e
876 );
877 }
878 }
879 });
880 }
881 Err(e) => {
882 log::error!(
883 "Failed to parse query {}: {} | payload: {}",
884 query_id,
885 e,
886 serde_json::to_string(&wrapped.query).unwrap_or_default()
887 );
888 }
889 }
890 } else {
891 log::warn!(
893 "No registered query handler for {}, falling back to select all",
894 query_id
895 );
896 let cellmap = registry.get_or_create(entity_type).select(|_| true);
897 session.subscribe_query(tx_id, cellmap, wrapped.window.clone());
898 }
899 }
900
901 MykoMessage::View(wrapped) => {
902 let tx_id: Arc<str> = wrapped
903 .view
904 .get("tx")
905 .and_then(|v| v.as_str())
906 .unwrap_or("unknown")
907 .into();
908 let view_id = &wrapped.view_id;
909 let item_type = &wrapped.view_item_type;
910
911 if session.has_subscription(&tx_id) {
913 log::debug!(
914 "Ignoring duplicate view subscribe client={} tx={} view_id={} item_type={}",
915 session.client_id,
916 tx_id,
917 view_id,
918 item_type
919 );
920 return Ok(());
921 }
922 if let Ok(mut map) = view_ids_by_tx.lock() {
923 map.insert(tx_id.clone(), view_id.clone());
924 }
925 if let Ok(mut map) = subscribe_started_by_tx.lock() {
926 map.entry(tx_id.clone()).or_insert_with(Instant::now);
927 }
928
929 log::trace!("View {} for {} (tx: {})", view_id, item_type, tx_id);
930 log::trace!(
931 "View subscribe request client={} tx={} view_id={} item_type={} window={:?}",
932 session.client_id,
933 tx_id,
934 view_id,
935 item_type,
936 wrapped.window
937 );
938
939 let request_context = Arc::new(RequestContext::from_client(
940 tx_id.clone(),
941 session.client_id.clone(),
942 host_id,
943 ));
944
945 if let Some(view_data) = handler_registry.get_view(view_id) {
946 let parsed = (view_data.parse)(wrapped.view.clone());
947 match parsed {
948 Ok(any_view) => {
949 log::trace!(
950 "View parsed successfully client={} tx={} view_id={}",
951 session.client_id,
952 tx_id,
953 view_id
954 );
955 let cell_factory = view_data.cell_factory;
958 let registry = registry.clone();
959 let ctx = ctx.clone();
960 let window = wrapped.window.clone();
961 let view_id_clone = view_id.clone();
962 let sub_tx = subscribe_tx.clone();
963 let priority_tx = priority_tx.clone();
964 let drop_logger = drop_logger.clone();
965 tokio::task::spawn_blocking(move || {
966 match cell_factory(any_view, registry, request_context, ctx) {
967 Ok(filtered_cellmap) => {
968 log::trace!(
969 "View cell factory succeeded tx={} view_id={}",
970 tx_id,
971 view_id_clone
972 );
973 let _ = sub_tx.send(SubscriptionReady::View {
974 tx_id,
975 view_id: view_id_clone,
976 cellmap: filtered_cellmap,
977 window,
978 });
979 }
980 Err(e) => {
981 log::error!(
982 "Failed to create view cell for {}: {}",
983 view_id_clone,
984 e
985 );
986 if let Err(err) = priority_tx.try_send(
987 MykoMessage::ViewError(ViewError {
988 tx: tx_id.to_string(),
989 view_id: view_id_clone.to_string(),
990 message: e,
991 }),
992 ) {
993 drop_logger.on_drop("ViewError", &err);
994 }
995 }
996 }
997 });
998 }
999 Err(e) => {
1000 let message = format!("Failed to parse view {}: {}", view_id, e);
1001 log::error!(
1002 "{} | payload: {}",
1003 message,
1004 serde_json::to_string(&wrapped.view).unwrap_or_default()
1005 );
1006 if let Err(err) =
1007 priority_tx.try_send(MykoMessage::ViewError(ViewError {
1008 tx: tx_id.to_string(),
1009 view_id: view_id.to_string(),
1010 message,
1011 }))
1012 {
1013 drop_logger.on_drop("ViewError", &err);
1014 }
1015 }
1016 }
1017 } else {
1018 let message = format!("No registered handler for view: {}", view_id);
1019 log::warn!("{}", message);
1020 if let Err(err) = priority_tx.try_send(MykoMessage::ViewError(ViewError {
1021 tx: tx_id.to_string(),
1022 view_id: view_id.to_string(),
1023 message,
1024 })) {
1025 drop_logger.on_drop("ViewError", &err);
1026 }
1027 }
1028 }
1029
1030 MykoMessage::QueryCancel(CancelSubscription { tx: tx_id }) => {
1031 log::trace!(
1032 "QueryCancel received: client={} tx={}",
1033 session.client_id,
1034 tx_id
1035 );
1036 let tx_id: Arc<str> = tx_id.into();
1037 if let Ok(mut map) = query_ids_by_tx.lock() {
1038 map.remove(&tx_id);
1039 }
1040 if let Ok(mut map) = subscribe_started_by_tx.lock() {
1041 map.remove(&tx_id);
1042 }
1043 session.cancel(&tx_id);
1044 }
1045
1046 MykoMessage::QueryWindow(QueryWindowUpdate { tx, window }) => {
1047 let tx_id: Arc<str> = tx.into();
1048 log::trace!(
1049 "Query window request client={} tx={} has_window={} active_subscriptions={}",
1050 session.client_id,
1051 tx_id,
1052 window.is_some(),
1053 session.subscription_count()
1054 );
1055 session.update_query_window(&tx_id, window);
1056 }
1057 MykoMessage::ViewCancel(CancelSubscription { tx: tx_id }) => {
1058 log::trace!("View cancel: {}", tx_id);
1059 let tx_id: Arc<str> = tx_id.into();
1060 if let Ok(mut map) = view_ids_by_tx.lock() {
1061 map.remove(&tx_id);
1062 }
1063 if let Ok(mut map) = subscribe_started_by_tx.lock() {
1064 map.remove(&tx_id);
1065 }
1066 session.cancel(&tx_id);
1067 }
1068 MykoMessage::ViewWindow(ViewWindowUpdate { tx, window }) => {
1069 let tx_id: Arc<str> = tx.into();
1070 log::trace!("View window update: {}", tx_id);
1071 session.update_view_window(&tx_id, window);
1072 }
1073
1074 MykoMessage::Report(wrapped) => {
1075 let tx_id: Arc<str> = wrapped
1077 .report
1078 .get("tx")
1079 .and_then(|v| v.as_str())
1080 .unwrap_or("unknown")
1081 .into();
1082 let report_id = &wrapped.report_id;
1083
1084 log::trace!(
1085 "Report subscribe request client={} tx={} report_id={} active_subscriptions_before={}",
1086 session.client_id,
1087 tx_id,
1088 report_id,
1089 session.subscription_count()
1090 );
1091
1092 if let Some(report_data) = handler_registry.get_report(report_id) {
1094 let parsed = (report_data.parse)(wrapped.report.clone());
1096 match parsed {
1097 Ok(any_report) => {
1098 let request_context = Arc::new(RequestContext::from_client(
1099 tx_id.clone(),
1100 session.client_id.clone(),
1101 host_id,
1102 ));
1103
1104 match (report_data.cell_factory)(any_report, request_context, ctx) {
1106 Ok(cell) => {
1107 session.subscribe_report(
1108 tx_id,
1109 report_id.as_str().into(),
1110 cell,
1111 );
1112 }
1113 Err(e) => {
1114 log::error!(
1115 "Failed to create report cell for {}: {}",
1116 report_id,
1117 e
1118 );
1119 }
1120 }
1121 }
1122 Err(e) => {
1123 log::error!(
1124 "Failed to parse report {}: {} | payload: {}",
1125 report_id,
1126 e,
1127 serde_json::to_string(&wrapped.report).unwrap_or_default()
1128 );
1129 }
1130 }
1131 } else {
1132 log::warn!("No registered handler for report: {}", report_id);
1133 }
1134 }
1135
1136 MykoMessage::ReportCancel(CancelSubscription { tx: tx_id }) => {
1137 log::trace!(
1138 "ReportCancel received: client={} tx={} active_subscriptions_before={}",
1139 session.client_id,
1140 tx_id,
1141 session.subscription_count()
1142 );
1143 session.cancel(&tx_id.into());
1144 }
1145
1146 MykoMessage::Event(mut event) => {
1147 record_ws_ingest(std::slice::from_ref(&event));
1148 event.sanitize_null_bytes();
1149 normalize_incoming_event(&mut event, &session.client_id);
1150 if let Err(e) = ctx.apply_event(event) {
1151 log::error!(
1152 "Failed to apply event from client {}: {e}",
1153 session.client_id
1154 );
1155 }
1156 }
1157
1158 MykoMessage::EventBatch(mut events) => {
1159 record_ws_ingest(&events);
1160 let incoming = events.len();
1161 if incoming >= 64 {
1162 log::trace!(
1163 "Received event batch from client {} size={}",
1164 session.client_id,
1165 incoming
1166 );
1167 }
1168 for event in &mut events {
1169 event.sanitize_null_bytes();
1170 normalize_incoming_event(event, &session.client_id);
1171 }
1172 match ctx.apply_event_batch(events) {
1173 Ok(applied) => {
1174 log::trace!(
1175 "Applied event batch from client {} size={}",
1176 session.client_id,
1177 applied
1178 );
1179 }
1180 Err(e) => {
1181 log::error!(
1182 "Failed to apply event batch from client {}: {}",
1183 session.client_id,
1184 e
1185 );
1186 }
1187 }
1188 }
1189
1190 MykoMessage::Command(wrapped) => {
1191 let tx_id: Arc<str> = wrapped
1193 .command
1194 .get("tx")
1195 .and_then(|v| v.as_str())
1196 .unwrap_or("unknown")
1197 .into();
1198
1199 let command_id = &wrapped.command_id;
1200
1201 log::trace!("Command {} (tx: {})", command_id, tx_id,);
1202 let received_at = Instant::now();
1203 if let Ok(mut map) = command_started_by_tx.lock() {
1204 map.insert(tx_id.clone(), received_at);
1205 }
1206 if let Err(e) = command_tx.send(CommandJob {
1207 tx_id: tx_id.clone(),
1208 command_id: wrapped.command_id.clone(),
1209 command: wrapped.command.clone(),
1210 received_at,
1211 }) {
1212 log::error!(
1213 "Failed to enqueue command {} for client {} tx={}: {}",
1214 command_id,
1215 session.client_id,
1216 tx_id,
1217 e
1218 );
1219 let error = MykoMessage::CommandError(CommandError {
1220 tx: tx_id.to_string(),
1221 command_id: command_id.to_string(),
1222 message: "Command queue unavailable".to_string(),
1223 });
1224 if let Err(err) = priority_tx.try_send(error) {
1225 drop_logger.on_drop("CommandError", &err);
1226 }
1227 }
1228 }
1229
1230 MykoMessage::Ping(PingData { id, timestamp }) => {
1231 let pong = MykoMessage::Ping(PingData { id, timestamp });
1233 if let Err(e) = priority_tx.try_send(pong) {
1234 drop_logger.on_drop("Ping", &e);
1235 }
1236 }
1237
1238 MykoMessage::QueryResponse(resp) => {
1240 log::warn!(
1241 "Unexpected client message kind=query_response client={} tx={} seq={} upserts={} deletes={} active_subscriptions={}",
1242 session.client_id,
1243 resp.tx,
1244 resp.sequence,
1245 resp.upserts.len(),
1246 resp.deletes.len(),
1247 session.subscription_count()
1248 );
1249 }
1250 MykoMessage::QueryError(err) => {
1251 log::warn!(
1252 "Unexpected client message kind=query_error client={} tx={} query_id={} message={} active_subscriptions={}",
1253 session.client_id,
1254 err.tx,
1255 err.query_id,
1256 err.message,
1257 session.subscription_count()
1258 );
1259 }
1260 MykoMessage::ViewResponse(resp) => {
1261 log::warn!(
1262 "Unexpected client message kind=view_response client={} tx={} seq={} upserts={} deletes={} active_subscriptions={}",
1263 session.client_id,
1264 resp.tx,
1265 resp.sequence,
1266 resp.upserts.len(),
1267 resp.deletes.len(),
1268 session.subscription_count()
1269 );
1270 }
1271 MykoMessage::ViewError(err) => {
1272 log::warn!(
1273 "Unexpected client message kind=view_error client={} tx={} view_id={} message={} active_subscriptions={}",
1274 session.client_id,
1275 err.tx,
1276 err.view_id,
1277 err.message,
1278 session.subscription_count()
1279 );
1280 }
1281 MykoMessage::ReportResponse(resp) => {
1282 log::warn!(
1283 "Unexpected client message kind=report_response client={} tx={} active_subscriptions={}",
1284 session.client_id,
1285 resp.tx,
1286 session.subscription_count()
1287 );
1288 }
1289 MykoMessage::ReportError(err) => {
1290 log::warn!(
1291 "Unexpected client message kind=report_error client={} tx={} report_id={} message={} active_subscriptions={}",
1292 session.client_id,
1293 err.tx,
1294 err.report_id,
1295 err.message,
1296 session.subscription_count()
1297 );
1298 }
1299 MykoMessage::CommandResponse(resp) => {
1300 if resp.tx.trim().is_empty() {
1301 log::warn!(
1302 "Malformed client message kind=command_response client={} tx=<empty> active_subscriptions={}",
1303 session.client_id,
1304 session.subscription_count()
1305 );
1306 } else {
1307 let correlated = outbound_commands_by_tx
1308 .lock()
1309 .ok()
1310 .and_then(|mut map| map.remove(&resp.tx));
1311 if let Some((command_id, started)) = correlated {
1312 log::debug!(
1313 "Client command response matched outbound command client={} tx={} command_id={} roundtrip_ms={} active_subscriptions={}",
1314 session.client_id,
1315 resp.tx,
1316 command_id,
1317 started.elapsed().as_millis(),
1318 session.subscription_count()
1319 );
1320 } else {
1321 log::warn!(
1322 "Client command response without outbound match client={} tx={} active_subscriptions={}",
1323 session.client_id,
1324 resp.tx,
1325 session.subscription_count()
1326 );
1327 }
1328 }
1329 }
1330 MykoMessage::CommandError(err) => {
1331 if err.tx.trim().is_empty() {
1332 log::warn!(
1333 "Malformed client message kind=command_error client={} tx=<empty> command_id={} message={} active_subscriptions={}",
1334 session.client_id,
1335 err.command_id,
1336 err.message,
1337 session.subscription_count()
1338 );
1339 } else {
1340 let correlated = outbound_commands_by_tx
1341 .lock()
1342 .ok()
1343 .and_then(|mut map| map.remove(&err.tx));
1344 if let Some((command_id, started)) = correlated {
1345 log::warn!(
1346 "Client command error matched outbound command client={} tx={} command_id={} transport_command_id={} message={} roundtrip_ms={} active_subscriptions={}",
1347 session.client_id,
1348 err.tx,
1349 err.command_id,
1350 command_id,
1351 err.message,
1352 started.elapsed().as_millis(),
1353 session.subscription_count()
1354 );
1355 } else {
1356 log::warn!(
1357 "Client command error without outbound match client={} tx={} command_id={} message={} active_subscriptions={}",
1358 session.client_id,
1359 err.tx,
1360 err.command_id,
1361 err.message,
1362 session.subscription_count()
1363 );
1364 }
1365 }
1366 }
1367 MykoMessage::Benchmark(payload) => {
1368 let stats = ws_benchmark_stats();
1369 ensure_ws_benchmark_logger();
1370 stats.message_count.fetch_add(1, Ordering::Relaxed);
1371 let size = payload.to_string().len() as u64;
1373 stats.total_bytes.fetch_add(size, Ordering::Relaxed);
1374 }
1375 MykoMessage::ProtocolSwitch { protocol } => {
1376 log::warn!(
1377 "Unexpected client message kind=protocol_switch_ack client={} protocol={} active_subscriptions={}",
1378 session.client_id,
1379 protocol,
1380 session.subscription_count()
1381 );
1382 }
1383 }
1384
1385 Ok(())
1386 }
1387
1388 fn execute_command_job(
1389 ctx: Arc<CellServerCtx>,
1390 priority_tx: &mpsc::Sender<MykoMessage>,
1391 drop_logger: &DropLogger,
1392 client_id: Arc<str>,
1393 job: CommandJob,
1394 ) {
1395 let host_id = ctx.host_id;
1396 let started = Instant::now();
1397 let queue_wait_ms = started.duration_since(job.received_at).as_millis();
1398 let command_id = job.command_id.clone();
1399
1400 let mut handler_found = false;
1401 for registration in inventory::iter::<CommandHandlerRegistration> {
1402 if registration.command_id == command_id {
1403 handler_found = true;
1404 let executor = (registration.factory)();
1405
1406 let req = Arc::new(RequestContext::from_client(
1407 job.tx_id.clone(),
1408 client_id.clone(),
1409 host_id,
1410 ));
1411 let cmd_id: Arc<str> = Arc::from(command_id.clone());
1412 let cmd_ctx = CommandContext::new(cmd_id, req, ctx.clone());
1413 let execute_started = Instant::now();
1414
1415 match executor.execute_from_value(job.command.clone(), cmd_ctx) {
1416 Ok(result) => {
1417 let response = MykoMessage::CommandResponse(CommandResponse {
1418 response: result,
1419 tx: job.tx_id.to_string(),
1420 });
1421 if let Err(e) = priority_tx.try_send(response) {
1422 drop_logger.on_drop("CommandResponse", &e);
1423 }
1424 }
1425 Err(e) => {
1426 let error = MykoMessage::CommandError(CommandError {
1427 tx: job.tx_id.to_string(),
1428 command_id: command_id.clone(),
1429 message: e.message,
1430 });
1431 if let Err(err) = priority_tx.try_send(error) {
1432 drop_logger.on_drop("CommandError", &err);
1433 }
1434 }
1435 }
1436 let execute_ms = execute_started.elapsed().as_millis();
1437 let total_ms = job.received_at.elapsed().as_millis();
1438 log::trace!(
1439 target: "myko_server::ws_perf",
1440 "command_exec client={} tx={} command_id={} queue_wait_ms={} execute_ms={} total_ms={}",
1441 client_id,
1442 job.tx_id,
1443 command_id,
1444 queue_wait_ms,
1445 execute_ms,
1446 total_ms
1447 );
1448 break;
1449 }
1450 }
1451
1452 if !handler_found {
1453 log::warn!("No registered handler for command: {}", command_id);
1454 let error = MykoMessage::CommandError(CommandError {
1455 tx: job.tx_id.to_string(),
1456 command_id: command_id.clone(),
1457 message: format!("Command handler not found: {}", command_id),
1458 });
1459 if let Err(e) = priority_tx.try_send(error) {
1460 drop_logger.on_drop("CommandError", &e);
1461 }
1462 }
1463
1464 if !handler_found {
1465 log::debug!(
1466 target: "myko_server::ws_perf",
1467 "command_exec client={} tx={} command_id={} queue_wait_ms={} execute_ms=0 total_ms={} handler_found=false",
1468 client_id,
1469 job.tx_id,
1470 command_id,
1471 queue_wait_ms,
1472 job.received_at.elapsed().as_millis()
1473 );
1474 }
1475 }
1476}
1477
1478struct ChannelWriter {
1483 tx: mpsc::Sender<OutboundMessage>,
1484 deferred_tx: mpsc::Sender<DeferredOutbound>,
1485 drop_logger: Arc<DropLogger>,
1486 use_binary_writer: Arc<AtomicBool>,
1487}
1488
1489impl WsWriter for ChannelWriter {
1490 fn send(&self, msg: MykoMessage) {
1491 if let Err(e) = self.tx.try_send(OutboundMessage::Message(msg)) {
1493 self.drop_logger.on_drop("message", &e);
1494 }
1495 }
1496
1497 fn protocol(&self) -> myko::client::MykoProtocol {
1498 if self.use_binary_writer.load(Ordering::SeqCst) {
1499 myko::client::MykoProtocol::MSGPACK
1500 } else {
1501 myko::client::MykoProtocol::JSON
1502 }
1503 }
1504
1505 fn send_serialized_command(
1506 &self,
1507 tx: Arc<str>,
1508 command_id: String,
1509 payload: EncodedCommandMessage,
1510 ) {
1511 if let Err(e) = self.tx.try_send(OutboundMessage::SerializedCommand {
1512 tx,
1513 command_id,
1514 payload,
1515 }) {
1516 self.drop_logger.on_drop("serialized_command", &e);
1517 }
1518 }
1519
1520 fn send_report_response(&self, tx: Arc<str>, output: Arc<dyn AnyOutput>) {
1521 if let Err(e) = self
1522 .deferred_tx
1523 .try_send(DeferredOutbound::Report(tx, output))
1524 {
1525 self.drop_logger.on_drop("ReportResponseDeferred", &e);
1526 }
1527 }
1528
1529 fn send_query_response(&self, response: PendingQueryResponse, is_view: bool) {
1530 if let Err(e) = self
1531 .deferred_tx
1532 .try_send(DeferredOutbound::Query { response, is_view })
1533 {
1534 self.drop_logger.on_drop("QueryResponseDeferred", &e);
1535 }
1536 }
1537}
1538
1539#[cfg(test)]
1540mod tests {
1541 use super::*;
1542
1543 #[test]
1544 fn test_channel_writer() {
1545 let (tx, mut rx) = mpsc::channel(10);
1546 let (deferred_tx, _deferred_rx) = mpsc::channel(10);
1547 let drop_logger = Arc::new(DropLogger::new("test-client".into()));
1548 let writer = ChannelWriter {
1549 tx,
1550 deferred_tx,
1551 drop_logger,
1552 use_binary_writer: Arc::new(AtomicBool::new(false)),
1553 };
1554
1555 let msg = MykoMessage::Ping(PingData {
1556 id: "test".to_string(),
1557 timestamp: 0,
1558 });
1559 writer.send(msg);
1560
1561 let received = rx.try_recv().unwrap();
1562 assert!(matches!(
1563 received,
1564 OutboundMessage::Message(MykoMessage::Ping(_))
1565 ));
1566 }
1567}