1use std::{
6 collections::HashMap,
7 net::SocketAddr,
8 sync::{
9 Arc, Mutex, OnceLock,
10 atomic::{AtomicBool, AtomicU64, Ordering},
11 },
12 thread,
13 time::{Duration, Instant},
14};
15
16use dashmap::DashMap;
17use futures_util::{SinkExt, StreamExt};
18use hyphae::SelectExt;
19use myko::{
20 WS_MAX_FRAME_SIZE_BYTES, WS_MAX_MESSAGE_SIZE_BYTES,
21 command::{CommandContext, CommandHandlerRegistration},
22 entities::client::{Client, ClientId},
23 relationship::{
24 iter_client_id_registrations, iter_fallback_to_id_registrations,
25 iter_server_owned_registrations,
26 },
27 report::AnyOutput,
28 request::RequestContext,
29 server::{
30 CellServerCtx, ClientSession, PendingQueryResponse, WsWriter,
31 client_registry::try_client_registry,
32 },
33 wire::{
34 CancelSubscription, CommandError, CommandResponse, EncodedCommandMessage, MEvent,
35 MEventType, MykoMessage, PingData, QueryWindowUpdate, ViewError, ViewWindowUpdate,
36 },
37};
38use tokio::{net::TcpStream, sync::mpsc, time::interval};
39use tokio_tungstenite::{
40 accept_async_with_config,
41 tungstenite::{Message, protocol::WebSocketConfig},
42};
43use uuid::Uuid;
44
45const SWITCH_TO_MSGPACK: &str = "myko:switch-to-msgpack";
48
49struct WsIngestStats {
50 counts_by_type: DashMap<Arc<str>, Arc<AtomicU64>>,
51}
52
53struct WsBenchmarkStats {
54 message_count: AtomicU64,
55 total_bytes: AtomicU64,
56}
57
58static WS_INGEST_STATS: OnceLock<Arc<WsIngestStats>> = OnceLock::new();
59static WS_INGEST_LOGGER_STARTED: AtomicBool = AtomicBool::new(false);
60static WS_BENCHMARK_STATS: OnceLock<Arc<WsBenchmarkStats>> = OnceLock::new();
61static WS_BENCHMARK_LOGGER_STARTED: AtomicBool = AtomicBool::new(false);
62
63fn ws_benchmark_stats() -> Arc<WsBenchmarkStats> {
64 WS_BENCHMARK_STATS
65 .get_or_init(|| {
66 Arc::new(WsBenchmarkStats {
67 message_count: AtomicU64::new(0),
68 total_bytes: AtomicU64::new(0),
69 })
70 })
71 .clone()
72}
73
74fn ensure_ws_benchmark_logger() {
75 if WS_BENCHMARK_LOGGER_STARTED
76 .compare_exchange(false, true, Ordering::Relaxed, Ordering::Relaxed)
77 .is_err()
78 {
79 return;
80 }
81
82 let stats = ws_benchmark_stats();
83 thread::Builder::new()
84 .name("ws-benchmark-logger".into())
85 .spawn(move || {
86 loop {
87 thread::sleep(Duration::from_secs(1));
88
89 let count = stats.message_count.swap(0, Ordering::Relaxed);
90 let bytes = stats.total_bytes.swap(0, Ordering::Relaxed);
91
92 if count == 0 {
93 continue;
94 }
95
96 log::info!(
97 "WebSocket benchmark last_1s messages={} bytes={} avg_bytes={}",
98 count,
99 bytes,
100 if count > 0 { bytes / count } else { 0 },
101 );
102 }
103 })
104 .expect("failed to spawn websocket benchmark logger thread");
105}
106
107fn ws_ingest_stats() -> Arc<WsIngestStats> {
108 WS_INGEST_STATS
109 .get_or_init(|| {
110 Arc::new(WsIngestStats {
111 counts_by_type: DashMap::new(),
112 })
113 })
114 .clone()
115}
116
117fn ensure_ws_ingest_logger() {
118 if WS_INGEST_LOGGER_STARTED
119 .compare_exchange(false, true, Ordering::Relaxed, Ordering::Relaxed)
120 .is_err()
121 {
122 return;
123 }
124
125 let stats = ws_ingest_stats();
126 thread::Builder::new()
127 .name("ws-ingest-logger".into())
128 .spawn(move || {
129 loop {
130 thread::sleep(Duration::from_secs(1));
131
132 let mut per_type = Vec::new();
133 let mut total = 0u64;
134 for entry in &stats.counts_by_type {
135 let count = entry.value().swap(0, Ordering::Relaxed);
136 if count == 0 {
137 continue;
138 }
139 total += count;
140 per_type.push((entry.key().clone(), count));
141 }
142
143 if total == 0 {
144 continue;
145 }
146
147 per_type.sort_by(|a, b| b.1.cmp(&a.1).then_with(|| a.0.cmp(&b.0)));
148 let details = per_type
149 .into_iter()
150 .map(|(entity_type, count)| format!("{entity_type}={count}"))
151 .collect::<Vec<_>>()
152 .join(" ");
153
154 log::info!("WebSocket ingest last_1s total={} {}", total, details);
155 }
156 })
157 .expect("failed to spawn websocket ingest logger thread");
158}
159
160fn record_ws_ingest(events: &[MEvent]) {
161 if events.is_empty() {
162 return;
163 }
164
165 let stats = ws_ingest_stats();
166 for event in events {
167 let counter = stats
168 .counts_by_type
169 .entry(Arc::<str>::from(event.item_type.as_str()))
170 .or_insert_with(|| Arc::new(AtomicU64::new(0)))
171 .clone();
172 counter.fetch_add(1, Ordering::Relaxed);
173 }
174}
175
176fn normalize_incoming_event(event: &mut MEvent, client_id: &str, host_id: uuid::Uuid) {
177 if event.change_type != MEventType::SET {
178 return;
179 }
180
181 for reg in iter_client_id_registrations() {
183 if reg.entity_type == event.item_type {
184 if let Some(obj) = event.item.as_object_mut() {
185 obj.insert(
186 reg.field_name_json.to_string(),
187 serde_json::Value::String(client_id.to_string()),
188 );
189 }
190 break;
191 }
192 }
193
194 for reg in iter_server_owned_registrations() {
196 if reg.entity_type == event.item_type {
197 if let Some(obj) = event.item.as_object_mut() {
198 let field = reg.field_name_json;
199 let current = obj.get(field).and_then(|v| v.as_str()).unwrap_or("");
200 if current.is_empty() {
201 obj.insert(
202 field.to_string(),
203 serde_json::Value::String(host_id.to_string()),
204 );
205 }
206 }
207 break;
208 }
209 }
210
211 if let Some(obj) = event.item.as_object_mut()
214 && let Some(id) = obj.get("id").and_then(|v| v.as_str()).map(String::from)
215 {
216 for reg in iter_fallback_to_id_registrations() {
217 if reg.entity_type == event.item_type {
218 let field = reg.field_name_json;
219 if matches!(obj.get(field), None | Some(serde_json::Value::Null)) {
220 obj.insert(field.to_string(), serde_json::Value::String(id.clone()));
221 }
222 }
223 }
224 }
225}
226
227struct DropLogger {
232 client_id: Arc<str>,
233 dropped: std::sync::atomic::AtomicU64,
234 last_log_ms: std::sync::atomic::AtomicU64,
235}
236
237impl DropLogger {
238 fn new(client_id: Arc<str>) -> Self {
239 Self {
240 client_id,
241 dropped: std::sync::atomic::AtomicU64::new(0),
242 last_log_ms: std::sync::atomic::AtomicU64::new(0),
243 }
244 }
245
246 fn on_drop(&self, kind: &'static str, err: &dyn std::fmt::Display) {
247 use std::sync::atomic::Ordering;
248
249 self.dropped.fetch_add(1, Ordering::Relaxed);
250
251 let now_ms = std::time::SystemTime::now()
253 .duration_since(std::time::UNIX_EPOCH)
254 .unwrap_or_default()
255 .as_millis() as u64;
256 let last_ms = self.last_log_ms.load(Ordering::Relaxed);
257 if now_ms.saturating_sub(last_ms) < 1000 {
258 return;
259 }
260
261 if self
262 .last_log_ms
263 .compare_exchange(last_ms, now_ms, Ordering::Relaxed, Ordering::Relaxed)
264 .is_err()
265 {
266 return;
267 }
268
269 let n = self.dropped.swap(0, Ordering::Relaxed);
270 log::warn!(
271 "WebSocket send buffer full; dropped {} message(s) for client {} (latest: {}): {}",
272 n,
273 self.client_id,
274 kind,
275 err
276 );
277 }
278}
279
280struct CommandJob {
281 tx_id: Arc<str>,
282 command_id: String,
283 command: serde_json::Value,
284 received_at: Instant,
285}
286
287enum SubscriptionReady {
289 Query {
290 tx_id: Arc<str>,
291 cellmap: hyphae::CellMap<Arc<str>, Arc<dyn myko::item::AnyItem>, hyphae::CellImmutable>,
292 window: Option<myko::wire::QueryWindow>,
293 },
294 View {
295 tx_id: Arc<str>,
296 view_id: Arc<str>,
297 cellmap: hyphae::CellMap<Arc<str>, Arc<dyn myko::item::AnyItem>, hyphae::CellImmutable>,
298 window: Option<myko::wire::QueryWindow>,
299 },
300}
301
302enum OutboundMessage {
303 Message(MykoMessage),
304 SerializedCommand {
305 tx: Arc<str>,
306 command_id: String,
307 payload: EncodedCommandMessage,
308 },
309}
310
311enum DeferredOutbound {
312 Report(Arc<str>, Arc<dyn AnyOutput>),
313 Query {
314 response: PendingQueryResponse,
315 is_view: bool,
316 },
317}
318
319pub struct WsHandler;
321
322impl WsHandler {
323 #[allow(clippy::too_many_arguments)]
325 pub async fn handle_connection(
326 stream: TcpStream,
327 addr: SocketAddr,
328 ctx: Arc<CellServerCtx>,
329 ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
330 ensure_ws_ingest_logger();
331
332 let host_id = ctx.host_id;
333
334 let mut ws_config = WebSocketConfig::default();
335 ws_config.max_message_size = Some(WS_MAX_MESSAGE_SIZE_BYTES);
336 ws_config.max_frame_size = Some(WS_MAX_FRAME_SIZE_BYTES);
337 let ws_stream = accept_async_with_config(stream, Some(ws_config)).await?;
338 let (mut write, mut read) = ws_stream.split();
339
340 let (tx, mut rx) = mpsc::channel::<OutboundMessage>(10_000);
343 let (deferred_tx, mut deferred_rx) = mpsc::channel::<DeferredOutbound>(10_000);
344 let (priority_tx, mut priority_rx) = mpsc::channel::<MykoMessage>(1_000);
345 let (command_tx, mut command_rx) = mpsc::unbounded_channel::<CommandJob>();
346 let (subscribe_tx, mut subscribe_rx) = mpsc::unbounded_channel::<SubscriptionReady>();
347
348 let use_binary = Arc::new(AtomicBool::new(false));
350
351 let client_id: Arc<str> = Uuid::new_v4().to_string().into();
353 let drop_logger = Arc::new(DropLogger::new(client_id.clone()));
354 let writer = ChannelWriter {
355 tx: tx.clone(),
356 deferred_tx: deferred_tx.clone(),
357 drop_logger: drop_logger.clone(),
358 use_binary_writer: use_binary.clone(),
359 };
360
361 let writer_arc: Arc<dyn WsWriter> = Arc::new(ChannelWriter {
363 tx: tx.clone(),
364 deferred_tx: deferred_tx.clone(),
365 drop_logger: drop_logger.clone(),
366 use_binary_writer: use_binary.clone(),
367 });
368 if let Some(registry) = try_client_registry() {
369 registry.register(client_id.clone(), writer_arc);
370 }
371
372 let mut session = ClientSession::new(client_id.clone(), writer);
373
374 let use_binary_writer = use_binary.clone();
375 let query_ids_by_tx: Arc<Mutex<HashMap<Arc<str>, Arc<str>>>> =
376 Arc::new(Mutex::new(HashMap::new()));
377 let view_ids_by_tx: Arc<Mutex<HashMap<Arc<str>, Arc<str>>>> =
378 Arc::new(Mutex::new(HashMap::new()));
379 let subscribe_started_by_tx: Arc<Mutex<HashMap<Arc<str>, Instant>>> =
380 Arc::new(Mutex::new(HashMap::new()));
381 let command_started_by_tx: Arc<Mutex<HashMap<Arc<str>, Instant>>> =
382 Arc::new(Mutex::new(HashMap::new()));
383 let outbound_commands_by_tx: Arc<Mutex<HashMap<String, (String, Instant)>>> =
384 Arc::new(Mutex::new(HashMap::new()));
385
386 let outbound_commands_by_tx_writer = outbound_commands_by_tx.clone();
387
388 let client_entity = Client {
390 id: ClientId(client_id.clone()),
391 server_id: host_id.to_string().into(),
392 address: Some(Arc::from(addr.to_string())),
393 windback: None,
394 };
395 if let Err(e) = ctx.set(&client_entity) {
396 log::error!("Failed to persist client entity: {e}");
397 }
398
399 log::info!("Client connected: {} from {}", client_id, addr);
400
401 let write_ctx = ctx.clone();
402 let write_client_id = client_id.clone();
403 let write_addr = addr;
404 let command_ctx = ctx.clone();
405 let command_priority_tx = priority_tx.clone();
406 let command_drop_logger = drop_logger.clone();
407 let command_client_id = client_id.clone();
408
409 let write_task = tokio::spawn(async move {
411 let _ctx = write_ctx;
412 let mut normal_open = true;
413 let mut priority_open = true;
414 let mut deferred_open = true;
415 while normal_open || priority_open || deferred_open {
416 let msg = tokio::select! {
417 biased;
418 maybe = priority_rx.recv(), if priority_open => {
419 match maybe {
420 Some(msg) => OutboundMessage::Message(msg),
421 None => {
422 priority_open = false;
423 continue;
424 }
425 }
426 }
427 maybe = deferred_rx.recv(), if deferred_open => {
428 match maybe {
429 Some(DeferredOutbound::Report(tx, output)) => {
430 OutboundMessage::Message(MykoMessage::ReportResponse(myko::wire::ReportResponse {
431 response: output.to_value(),
432 tx: tx.to_string(),
433 }))
434 }
435 Some(DeferredOutbound::Query { response, is_view }) => {
436 if is_view {
437 OutboundMessage::Message(MykoMessage::ViewResponse(response.into_wire()))
438 } else {
439 OutboundMessage::Message(MykoMessage::QueryResponse(response.into_wire()))
440 }
441 }
442 None => {
443 deferred_open = false;
444 continue;
445 }
446 }
447 }
448 maybe = rx.recv(), if normal_open => {
449 match maybe {
450 Some(msg) => msg,
451 None => {
452 normal_open = false;
453 continue;
454 }
455 }
456 }
457 };
458 let (kind, tx_id, seq, _upserts, _deletes, _total_count) = match &msg {
459 OutboundMessage::SerializedCommand { tx, .. } => {
460 ("command", Some(tx.clone()), None, None, None, None)
461 }
462 OutboundMessage::Message(msg) => match msg {
463 MykoMessage::ViewResponse(r) => (
464 "view_response",
465 Some(r.tx.clone()),
466 Some(r.sequence),
467 Some(r.upserts.len()),
468 Some(r.deletes.len()),
469 r.total_count,
470 ),
471 MykoMessage::QueryResponse(r) => (
472 "query_response",
473 Some(r.tx.clone()),
474 Some(r.sequence),
475 Some(r.upserts.len()),
476 Some(r.deletes.len()),
477 r.total_count,
478 ),
479 MykoMessage::CommandResponse(r) => (
480 "command_response",
481 Some(Arc::<str>::from(r.tx.clone())),
482 None,
483 None,
484 None,
485 None,
486 ),
487 MykoMessage::CommandError(r) => (
488 "command_error",
489 Some(Arc::<str>::from(r.tx.clone())),
490 None,
491 None,
492 None,
493 None,
494 ),
495 _ => ("other", None, None, None, None, None),
496 },
497 };
498
499 match &msg {
500 OutboundMessage::SerializedCommand { tx, command_id, .. } => {
501 if !tx.trim().is_empty()
502 && let Ok(mut map) = outbound_commands_by_tx_writer.lock()
503 {
504 map.insert(tx.to_string(), (command_id.clone(), Instant::now()));
505 }
506 }
507 OutboundMessage::Message(MykoMessage::Command(wrapped)) => {
508 if let Some(tx_id) = wrapped.command.get("tx").and_then(|v| v.as_str())
509 && !tx_id.trim().is_empty()
510 && let Ok(mut map) = outbound_commands_by_tx_writer.lock()
511 {
512 map.insert(
513 tx_id.to_string(),
514 (wrapped.command_id.clone(), Instant::now()),
515 );
516 }
517 }
518 _ => {}
519 }
520
521 let ws_msg = match &msg {
522 OutboundMessage::SerializedCommand {
523 payload: EncodedCommandMessage::Json(json),
524 ..
525 } => Message::Text(json.clone().into()),
526 OutboundMessage::SerializedCommand {
527 payload: EncodedCommandMessage::Msgpack(bytes),
528 ..
529 } => Message::Binary(bytes.clone().into()),
530 OutboundMessage::Message(msg) if use_binary_writer.load(Ordering::SeqCst) => {
531 match rmp_serde::to_vec(msg) {
532 Ok(bytes) => Message::Binary(bytes.into()),
533 Err(e) => {
534 log::error!("Failed to serialize message to msgpack: {}", e);
535 continue;
536 }
537 }
538 }
539 OutboundMessage::Message(msg) => match serde_json::to_string(msg) {
540 Ok(json) => Message::Text(json.into()),
541 Err(e) => {
542 log::error!("Failed to serialize message to JSON: {}", e);
543 continue;
544 }
545 },
546 };
547 let payload_bytes = match &ws_msg {
548 Message::Binary(b) => b.len(),
549 Message::Text(t) => t.len(),
550 _ => 0,
551 };
552
553 if let Err(err) = write.send(ws_msg).await {
554 log::error!(
555 "WebSocket write failed for client {} from {} kind={} tx={:?} seq={:?} payload_bytes={} binary={}: {}",
556 write_client_id,
557 write_addr,
558 kind,
559 tx_id,
560 seq,
561 payload_bytes,
562 use_binary_writer.load(Ordering::SeqCst),
563 err
564 );
565 break;
566 }
567 }
568 if let Some(registry) = try_client_registry() {
571 registry.unregister(&write_client_id);
572 log::info!(
573 "WebSocket writer unregistered client {} from {} (write task exiting)",
574 write_client_id,
575 write_addr,
576 );
577 }
578 log::warn!(
579 "WebSocket writer task exiting for client {} from {} normal_open={} priority_open={} deferred_open={}",
580 write_client_id,
581 write_addr,
582 normal_open,
583 priority_open,
584 deferred_open
585 );
586 });
587
588 let command_started_cleanup = command_started_by_tx.clone();
591 let command_task = tokio::spawn(async move {
592 while let Some(job) = command_rx.recv().await {
593 let command_ctx = command_ctx.clone();
594 let command_priority_tx = command_priority_tx.clone();
595 let command_drop_logger = command_drop_logger.clone();
596 let command_client_id = command_client_id.clone();
597 let tx_id = job.tx_id.clone();
598 let started_map = command_started_cleanup.clone();
599 match tokio::task::spawn_blocking(move || {
600 Self::execute_command_job(
601 command_ctx,
602 &command_priority_tx,
603 command_drop_logger.as_ref(),
604 command_client_id,
605 job,
606 );
607 })
608 .await
609 {
610 Ok(()) => {}
611 Err(e) => {
612 log::error!("Command worker panicked: {}", e);
613 }
614 }
615 if let Ok(mut map) = started_map.lock() {
617 map.remove(&tx_id);
618 }
619 }
620 });
621
622 let mut outbound_ttl_interval = interval(Duration::from_secs(10));
626 outbound_ttl_interval.tick().await; loop {
628 tokio::select! {
629 Some(ready) = subscribe_rx.recv() => {
631 let tx_id = match &ready {
632 SubscriptionReady::Query { tx_id, .. } => tx_id.clone(),
633 SubscriptionReady::View { tx_id, .. } => tx_id.clone(),
634 };
635 if let Ok(mut map) = subscribe_started_by_tx.lock() {
636 map.remove(&tx_id);
637 }
638 match ready {
639 SubscriptionReady::Query { tx_id, cellmap, window } => {
640 session.subscribe_query(tx_id, cellmap, window);
641 }
642 SubscriptionReady::View { tx_id, view_id, cellmap, window } => {
643 session.subscribe_view_with_id(tx_id, view_id, cellmap, window);
644 }
645 }
646 }
647 _ = outbound_ttl_interval.tick() => {
651 if let Ok(mut map) = outbound_commands_by_tx.lock() {
652 let before = map.len();
653 map.retain(|_, (_, started)| started.elapsed() < Duration::from_secs(10));
654 let removed = before - map.len();
655 if removed > 0 {
656 log::debug!(
657 "Outbound command TTL sweep client={}: removed {} stale entries, {} remaining",
658 session.client_id,
659 removed,
660 map.len()
661 );
662 }
663 }
664 }
665 msg = read.next() => {
667 let Some(msg) = msg else { break };
668 let ctx = ctx.clone();
669 let msg = match msg {
670 Ok(m) => m,
671 Err(e) => {
672 log::error!("WebSocket read error from {}: {}", client_id, e);
673 break;
674 }
675 };
676
677 match msg {
678 Message::Binary(data) => {
679 if !use_binary.load(Ordering::SeqCst) {
680 log::debug!(
681 "Client {} switched to binary (msgpack) protocol via auto-detect",
682 client_id
683 );
684 use_binary.store(true, Ordering::SeqCst);
685 }
686
687 match rmp_serde::from_slice::<MykoMessage>(&data) {
688 Ok(myko_msg) => {
689 if let Err(e) = Self::handle_message(
690 &mut session,
691 ctx,
692 &priority_tx,
693 &drop_logger,
694 &query_ids_by_tx,
695 &view_ids_by_tx,
696 &subscribe_started_by_tx,
697 &command_started_by_tx,
698 &outbound_commands_by_tx,
699 &command_tx,
700 &subscribe_tx,
701 myko_msg,
702 ) {
703 log::error!("Error handling message: {}", e);
704 }
705 tokio::task::yield_now().await;
706 }
707 Err(e) => {
708 log::warn!("Failed to parse message from {}: {}", client_id, e);
709 }
710 }
711 }
712 Message::Text(text) => {
713 if text == SWITCH_TO_MSGPACK {
714 log::debug!(
715 "Client {} switched to binary (msgpack) protocol via explicit request",
716 client_id
717 );
718 if let Err(e) = priority_tx.try_send(MykoMessage::ProtocolSwitch {
719 protocol: "msgpack".into(),
720 }) {
721 drop_logger.on_drop("ProtocolSwitch", &e);
722 }
723 use_binary.store(true, Ordering::SeqCst);
724 continue;
725 }
726
727 match serde_json::from_str::<MykoMessage>(&text) {
728 Ok(myko_msg) => {
729 if let Err(e) = Self::handle_message(
730 &mut session,
731 ctx,
732 &priority_tx,
733 &drop_logger,
734 &query_ids_by_tx,
735 &view_ids_by_tx,
736 &subscribe_started_by_tx,
737 &command_started_by_tx,
738 &outbound_commands_by_tx,
739 &command_tx,
740 &subscribe_tx,
741 myko_msg,
742 ) {
743 log::error!("Error handling message: {}", e);
744 }
745 tokio::task::yield_now().await;
746 }
747 Err(e) => {
748 log::warn!(
749 "Failed to parse JSON message from {}: {} | raw: {}",
750 client_id,
751 e,
752 if text.len() > 1000 {
753 &text[..1000]
754 } else {
755 &text
756 }
757 );
758 }
759 }
760 }
761 Message::Ping(data) => {
762 log::trace!("Ping from {}", client_id);
763 let _ = data;
764 }
765 Message::Pong(_) => {
766 log::trace!("Pong from {}", client_id);
767 }
768 Message::Close(frame) => {
769 log::warn!("Client {} sent close frame: {:?}", client_id, frame);
770 break;
771 }
772 Message::Frame(_) => {}
773 }
774 }
775 }
776 }
777
778 drop(session); write_task.abort();
781 command_task.abort();
782
783 if let Some(registry) = try_client_registry() {
785 registry.unregister(&client_id);
786 }
787
788 if let Err(e) = ctx.del(&client_entity) {
790 log::error!("Failed to delete client entity: {e}");
791 }
792
793 log::info!("Client disconnected: {} from {}", client_id, addr);
794
795 Ok(())
796 }
797
798 #[allow(clippy::too_many_arguments)]
800 fn handle_message<W: WsWriter>(
801 session: &mut ClientSession<W>,
802 ctx: Arc<CellServerCtx>,
803 priority_tx: &mpsc::Sender<MykoMessage>,
804 drop_logger: &Arc<DropLogger>,
805 query_ids_by_tx: &Arc<Mutex<HashMap<Arc<str>, Arc<str>>>>,
806 view_ids_by_tx: &Arc<Mutex<HashMap<Arc<str>, Arc<str>>>>,
807 subscribe_started_by_tx: &Arc<Mutex<HashMap<Arc<str>, Instant>>>,
808 command_started_by_tx: &Arc<Mutex<HashMap<Arc<str>, Instant>>>,
809 outbound_commands_by_tx: &Arc<Mutex<HashMap<String, (String, Instant)>>>,
810 command_tx: &mpsc::UnboundedSender<CommandJob>,
811 subscribe_tx: &mpsc::UnboundedSender<SubscriptionReady>,
812 msg: MykoMessage,
813 ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
814 let handler_registry = ctx.handler_registry.clone();
815
816 let registry = ctx.registry.clone();
817
818 let host_id = ctx.host_id;
819
820 match msg {
821 MykoMessage::Query(wrapped) => {
822 let tx_id: Arc<str> = wrapped
824 .query
825 .get("tx")
826 .and_then(|v| v.as_str())
827 .unwrap_or("unknown")
828 .into();
829 let query_id = &wrapped.query_id;
830 let entity_type = &wrapped.query_item_type;
831
832 if session.has_subscription(&tx_id) {
835 log::debug!(
836 "Ignoring duplicate query subscribe client={} tx={} query_id={} item_type={}",
837 session.client_id,
838 tx_id,
839 query_id,
840 entity_type
841 );
842 return Ok(());
843 }
844 if let Ok(mut map) = query_ids_by_tx.lock() {
845 map.insert(tx_id.clone(), query_id.clone());
846 }
847 if let Ok(mut map) = subscribe_started_by_tx.lock() {
848 map.entry(tx_id.clone()).or_insert_with(Instant::now);
849 }
850
851 log::trace!("Query {} for {} (tx: {})", query_id, entity_type, tx_id);
852 log::trace!(
853 "Query subscribe request client={} tx={} query_id={} item_type={} window={} active_subscriptions_before={}",
854 session.client_id,
855 tx_id,
856 query_id,
857 entity_type,
858 wrapped.window.is_some(),
859 session.subscription_count()
860 );
861
862 let request_context = Arc::new(RequestContext::from_client(
863 tx_id.clone(),
864 session.client_id.clone(),
865 host_id,
866 ));
867
868 if let Some(query_data) = handler_registry.get_query(query_id) {
869 let parsed = (query_data.parse)(wrapped.query.clone());
870 match parsed {
871 Ok(any_query) => {
872 let cell_factory = query_data.cell_factory;
875 let registry = registry.clone();
876 let request_context = request_context.clone();
877 let ctx = ctx.clone();
878 let window = wrapped.window.clone();
879 let query_id = query_id.clone();
880 let sub_tx = subscribe_tx.clone();
881 tokio::task::spawn_blocking(move || {
882 match cell_factory(any_query, registry, request_context, Some(ctx))
883 {
884 Ok(filtered_cellmap) => {
885 let _ = sub_tx.send(SubscriptionReady::Query {
886 tx_id,
887 cellmap: filtered_cellmap,
888 window,
889 });
890 }
891 Err(e) => {
892 log::error!(
893 "Failed to create query cell for {}: {}",
894 query_id,
895 e
896 );
897 }
898 }
899 });
900 }
901 Err(e) => {
902 log::error!(
903 "Failed to parse query {}: {} | payload: {}",
904 query_id,
905 e,
906 serde_json::to_string(&wrapped.query).unwrap_or_default()
907 );
908 }
909 }
910 } else {
911 log::warn!(
913 "No registered query handler for {}, falling back to select all",
914 query_id
915 );
916 let cellmap = registry.get_or_create(entity_type).select(|_| true);
917 session.subscribe_query(tx_id, cellmap, wrapped.window.clone());
918 }
919 }
920
921 MykoMessage::View(wrapped) => {
922 let tx_id: Arc<str> = wrapped
923 .view
924 .get("tx")
925 .and_then(|v| v.as_str())
926 .unwrap_or("unknown")
927 .into();
928 let view_id = &wrapped.view_id;
929 let item_type = &wrapped.view_item_type;
930
931 if session.has_subscription(&tx_id) {
933 log::debug!(
934 "Ignoring duplicate view subscribe client={} tx={} view_id={} item_type={}",
935 session.client_id,
936 tx_id,
937 view_id,
938 item_type
939 );
940 return Ok(());
941 }
942 if let Ok(mut map) = view_ids_by_tx.lock() {
943 map.insert(tx_id.clone(), view_id.clone());
944 }
945 if let Ok(mut map) = subscribe_started_by_tx.lock() {
946 map.entry(tx_id.clone()).or_insert_with(Instant::now);
947 }
948
949 log::trace!("View {} for {} (tx: {})", view_id, item_type, tx_id);
950 log::trace!(
951 "View subscribe request client={} tx={} view_id={} item_type={} window={:?}",
952 session.client_id,
953 tx_id,
954 view_id,
955 item_type,
956 wrapped.window
957 );
958
959 let request_context = Arc::new(RequestContext::from_client(
960 tx_id.clone(),
961 session.client_id.clone(),
962 host_id,
963 ));
964
965 if let Some(view_data) = handler_registry.get_view(view_id) {
966 let parsed = (view_data.parse)(wrapped.view.clone());
967 match parsed {
968 Ok(any_view) => {
969 log::trace!(
970 "View parsed successfully client={} tx={} view_id={}",
971 session.client_id,
972 tx_id,
973 view_id
974 );
975 let cell_factory = view_data.cell_factory;
978 let registry = registry.clone();
979 let ctx = ctx.clone();
980 let window = wrapped.window.clone();
981 let view_id_clone = view_id.clone();
982 let sub_tx = subscribe_tx.clone();
983 let priority_tx = priority_tx.clone();
984 let drop_logger = drop_logger.clone();
985 tokio::task::spawn_blocking(move || {
986 match cell_factory(any_view, registry, request_context, ctx) {
987 Ok(filtered_cellmap) => {
988 log::trace!(
989 "View cell factory succeeded tx={} view_id={}",
990 tx_id,
991 view_id_clone
992 );
993 let _ = sub_tx.send(SubscriptionReady::View {
994 tx_id,
995 view_id: view_id_clone,
996 cellmap: filtered_cellmap,
997 window,
998 });
999 }
1000 Err(e) => {
1001 log::error!(
1002 "Failed to create view cell for {}: {}",
1003 view_id_clone,
1004 e
1005 );
1006 if let Err(err) = priority_tx.try_send(
1007 MykoMessage::ViewError(ViewError {
1008 tx: tx_id.to_string(),
1009 view_id: view_id_clone.to_string(),
1010 message: e,
1011 }),
1012 ) {
1013 drop_logger.on_drop("ViewError", &err);
1014 }
1015 }
1016 }
1017 });
1018 }
1019 Err(e) => {
1020 let message = format!("Failed to parse view {}: {}", view_id, e);
1021 log::error!(
1022 "{} | payload: {}",
1023 message,
1024 serde_json::to_string(&wrapped.view).unwrap_or_default()
1025 );
1026 if let Err(err) =
1027 priority_tx.try_send(MykoMessage::ViewError(ViewError {
1028 tx: tx_id.to_string(),
1029 view_id: view_id.to_string(),
1030 message,
1031 }))
1032 {
1033 drop_logger.on_drop("ViewError", &err);
1034 }
1035 }
1036 }
1037 } else {
1038 let message = format!("No registered handler for view: {}", view_id);
1039 log::warn!("{}", message);
1040 if let Err(err) = priority_tx.try_send(MykoMessage::ViewError(ViewError {
1041 tx: tx_id.to_string(),
1042 view_id: view_id.to_string(),
1043 message,
1044 })) {
1045 drop_logger.on_drop("ViewError", &err);
1046 }
1047 }
1048 }
1049
1050 MykoMessage::QueryCancel(CancelSubscription { tx: tx_id }) => {
1051 log::trace!(
1052 "QueryCancel received: client={} tx={}",
1053 session.client_id,
1054 tx_id
1055 );
1056 let tx_id: Arc<str> = tx_id.into();
1057 if let Ok(mut map) = query_ids_by_tx.lock() {
1058 map.remove(&tx_id);
1059 }
1060 if let Ok(mut map) = subscribe_started_by_tx.lock() {
1061 map.remove(&tx_id);
1062 }
1063 session.cancel(&tx_id);
1064 }
1065
1066 MykoMessage::QueryWindow(QueryWindowUpdate { tx, window }) => {
1067 let tx_id: Arc<str> = tx.into();
1068 log::trace!(
1069 "Query window request client={} tx={} has_window={} active_subscriptions={}",
1070 session.client_id,
1071 tx_id,
1072 window.is_some(),
1073 session.subscription_count()
1074 );
1075 session.update_query_window(&tx_id, window);
1076 }
1077 MykoMessage::ViewCancel(CancelSubscription { tx: tx_id }) => {
1078 log::trace!("View cancel: {}", tx_id);
1079 let tx_id: Arc<str> = tx_id.into();
1080 if let Ok(mut map) = view_ids_by_tx.lock() {
1081 map.remove(&tx_id);
1082 }
1083 if let Ok(mut map) = subscribe_started_by_tx.lock() {
1084 map.remove(&tx_id);
1085 }
1086 session.cancel(&tx_id);
1087 }
1088 MykoMessage::ViewWindow(ViewWindowUpdate { tx, window }) => {
1089 let tx_id: Arc<str> = tx.into();
1090 log::trace!("View window update: {}", tx_id);
1091 session.update_view_window(&tx_id, window);
1092 }
1093
1094 MykoMessage::Report(wrapped) => {
1095 let tx_id: Arc<str> = wrapped
1097 .report
1098 .get("tx")
1099 .and_then(|v| v.as_str())
1100 .unwrap_or("unknown")
1101 .into();
1102 let report_id = &wrapped.report_id;
1103
1104 log::trace!(
1105 "Report subscribe request client={} tx={} report_id={} active_subscriptions_before={}",
1106 session.client_id,
1107 tx_id,
1108 report_id,
1109 session.subscription_count()
1110 );
1111
1112 if let Some(report_data) = handler_registry.get_report(report_id) {
1114 let parsed = (report_data.parse)(wrapped.report.clone());
1116 match parsed {
1117 Ok(any_report) => {
1118 let request_context = Arc::new(RequestContext::from_client(
1119 tx_id.clone(),
1120 session.client_id.clone(),
1121 host_id,
1122 ));
1123
1124 match (report_data.cell_factory)(any_report, request_context, ctx) {
1126 Ok(cell) => {
1127 session.subscribe_report(
1128 tx_id,
1129 report_id.as_str().into(),
1130 cell,
1131 );
1132 }
1133 Err(e) => {
1134 log::error!(
1135 "Failed to create report cell for {}: {}",
1136 report_id,
1137 e
1138 );
1139 }
1140 }
1141 }
1142 Err(e) => {
1143 log::error!(
1144 "Failed to parse report {}: {} | payload: {}",
1145 report_id,
1146 e,
1147 serde_json::to_string(&wrapped.report).unwrap_or_default()
1148 );
1149 }
1150 }
1151 } else {
1152 log::warn!("No registered handler for report: {}", report_id);
1153 }
1154 }
1155
1156 MykoMessage::ReportCancel(CancelSubscription { tx: tx_id }) => {
1157 log::trace!(
1158 "ReportCancel received: client={} tx={} active_subscriptions_before={}",
1159 session.client_id,
1160 tx_id,
1161 session.subscription_count()
1162 );
1163 session.cancel(&tx_id.into());
1164 }
1165
1166 MykoMessage::Event(mut event) => {
1167 record_ws_ingest(std::slice::from_ref(&event));
1168 event.sanitize_null_bytes();
1169 normalize_incoming_event(&mut event, &session.client_id, host_id);
1170 if let Err(e) = ctx.apply_event(event) {
1171 log::error!(
1172 "Failed to apply event from client {}: {e}",
1173 session.client_id
1174 );
1175 }
1176 }
1177
1178 MykoMessage::EventBatch(mut events) => {
1179 record_ws_ingest(&events);
1180 let incoming = events.len();
1181 if incoming >= 64 {
1182 log::trace!(
1183 "Received event batch from client {} size={}",
1184 session.client_id,
1185 incoming
1186 );
1187 }
1188 for event in &mut events {
1189 event.sanitize_null_bytes();
1190 normalize_incoming_event(event, &session.client_id, host_id);
1191 }
1192 match ctx.apply_event_batch(events) {
1193 Ok(applied) => {
1194 log::trace!(
1195 "Applied event batch from client {} size={}",
1196 session.client_id,
1197 applied
1198 );
1199 }
1200 Err(e) => {
1201 log::error!(
1202 "Failed to apply event batch from client {}: {}",
1203 session.client_id,
1204 e
1205 );
1206 }
1207 }
1208 }
1209
1210 MykoMessage::Command(wrapped) => {
1211 let tx_id: Arc<str> = wrapped
1213 .command
1214 .get("tx")
1215 .and_then(|v| v.as_str())
1216 .unwrap_or("unknown")
1217 .into();
1218
1219 let command_id = &wrapped.command_id;
1220
1221 log::trace!("Command {} (tx: {})", command_id, tx_id,);
1222 let received_at = Instant::now();
1223 if let Ok(mut map) = command_started_by_tx.lock() {
1224 map.insert(tx_id.clone(), received_at);
1225 }
1226 if let Err(e) = command_tx.send(CommandJob {
1227 tx_id: tx_id.clone(),
1228 command_id: wrapped.command_id.clone(),
1229 command: wrapped.command.clone(),
1230 received_at,
1231 }) {
1232 log::error!(
1233 "Failed to enqueue command {} for client {} tx={}: {}",
1234 command_id,
1235 session.client_id,
1236 tx_id,
1237 e
1238 );
1239 let error = MykoMessage::CommandError(CommandError {
1240 tx: tx_id.to_string(),
1241 command_id: command_id.to_string(),
1242 message: "Command queue unavailable".to_string(),
1243 });
1244 if let Err(err) = priority_tx.try_send(error) {
1245 drop_logger.on_drop("CommandError", &err);
1246 }
1247 }
1248 }
1249
1250 MykoMessage::Ping(PingData { id, timestamp }) => {
1251 let pong = MykoMessage::Ping(PingData { id, timestamp });
1253 if let Err(e) = priority_tx.try_send(pong) {
1254 drop_logger.on_drop("Ping", &e);
1255 }
1256 }
1257
1258 MykoMessage::QueryResponse(resp) => {
1260 log::warn!(
1261 "Unexpected client message kind=query_response client={} tx={} seq={} upserts={} deletes={} active_subscriptions={}",
1262 session.client_id,
1263 resp.tx,
1264 resp.sequence,
1265 resp.upserts.len(),
1266 resp.deletes.len(),
1267 session.subscription_count()
1268 );
1269 }
1270 MykoMessage::QueryError(err) => {
1271 log::warn!(
1272 "Unexpected client message kind=query_error client={} tx={} query_id={} message={} active_subscriptions={}",
1273 session.client_id,
1274 err.tx,
1275 err.query_id,
1276 err.message,
1277 session.subscription_count()
1278 );
1279 }
1280 MykoMessage::ViewResponse(resp) => {
1281 log::warn!(
1282 "Unexpected client message kind=view_response client={} tx={} seq={} upserts={} deletes={} active_subscriptions={}",
1283 session.client_id,
1284 resp.tx,
1285 resp.sequence,
1286 resp.upserts.len(),
1287 resp.deletes.len(),
1288 session.subscription_count()
1289 );
1290 }
1291 MykoMessage::ViewError(err) => {
1292 log::warn!(
1293 "Unexpected client message kind=view_error client={} tx={} view_id={} message={} active_subscriptions={}",
1294 session.client_id,
1295 err.tx,
1296 err.view_id,
1297 err.message,
1298 session.subscription_count()
1299 );
1300 }
1301 MykoMessage::ReportResponse(resp) => {
1302 log::warn!(
1303 "Unexpected client message kind=report_response client={} tx={} active_subscriptions={}",
1304 session.client_id,
1305 resp.tx,
1306 session.subscription_count()
1307 );
1308 }
1309 MykoMessage::ReportError(err) => {
1310 log::warn!(
1311 "Unexpected client message kind=report_error client={} tx={} report_id={} message={} active_subscriptions={}",
1312 session.client_id,
1313 err.tx,
1314 err.report_id,
1315 err.message,
1316 session.subscription_count()
1317 );
1318 }
1319 MykoMessage::CommandResponse(resp) => {
1320 if resp.tx.trim().is_empty() {
1321 log::warn!(
1322 "Malformed client message kind=command_response client={} tx=<empty> active_subscriptions={}",
1323 session.client_id,
1324 session.subscription_count()
1325 );
1326 } else {
1327 let correlated = outbound_commands_by_tx
1328 .lock()
1329 .ok()
1330 .and_then(|mut map| map.remove(&resp.tx));
1331 if let Some((command_id, started)) = correlated {
1332 log::debug!(
1333 "Client command response matched outbound command client={} tx={} command_id={} roundtrip_ms={} active_subscriptions={}",
1334 session.client_id,
1335 resp.tx,
1336 command_id,
1337 started.elapsed().as_millis(),
1338 session.subscription_count()
1339 );
1340 } else {
1341 log::warn!(
1342 "Client command response without outbound match client={} tx={} active_subscriptions={}",
1343 session.client_id,
1344 resp.tx,
1345 session.subscription_count()
1346 );
1347 }
1348 }
1349 }
1350 MykoMessage::CommandError(err) => {
1351 if err.tx.trim().is_empty() {
1352 log::warn!(
1353 "Malformed client message kind=command_error client={} tx=<empty> command_id={} message={} active_subscriptions={}",
1354 session.client_id,
1355 err.command_id,
1356 err.message,
1357 session.subscription_count()
1358 );
1359 } else {
1360 let correlated = outbound_commands_by_tx
1361 .lock()
1362 .ok()
1363 .and_then(|mut map| map.remove(&err.tx));
1364 if let Some((command_id, started)) = correlated {
1365 log::warn!(
1366 "Client command error matched outbound command client={} tx={} command_id={} transport_command_id={} message={} roundtrip_ms={} active_subscriptions={}",
1367 session.client_id,
1368 err.tx,
1369 err.command_id,
1370 command_id,
1371 err.message,
1372 started.elapsed().as_millis(),
1373 session.subscription_count()
1374 );
1375 } else {
1376 log::warn!(
1377 "Client command error without outbound match client={} tx={} command_id={} message={} active_subscriptions={}",
1378 session.client_id,
1379 err.tx,
1380 err.command_id,
1381 err.message,
1382 session.subscription_count()
1383 );
1384 }
1385 }
1386 }
1387 MykoMessage::Benchmark(payload) => {
1388 let stats = ws_benchmark_stats();
1389 ensure_ws_benchmark_logger();
1390 stats.message_count.fetch_add(1, Ordering::Relaxed);
1391 let size = payload.to_string().len() as u64;
1393 stats.total_bytes.fetch_add(size, Ordering::Relaxed);
1394 }
1395 MykoMessage::ProtocolSwitch { protocol } => {
1396 log::warn!(
1397 "Unexpected client message kind=protocol_switch_ack client={} protocol={} active_subscriptions={}",
1398 session.client_id,
1399 protocol,
1400 session.subscription_count()
1401 );
1402 }
1403 }
1404
1405 Ok(())
1406 }
1407
1408 fn execute_command_job(
1409 ctx: Arc<CellServerCtx>,
1410 priority_tx: &mpsc::Sender<MykoMessage>,
1411 drop_logger: &DropLogger,
1412 client_id: Arc<str>,
1413 job: CommandJob,
1414 ) {
1415 let host_id = ctx.host_id;
1416 let started = Instant::now();
1417 let queue_wait_ms = started.duration_since(job.received_at).as_millis();
1418 let command_id = job.command_id.clone();
1419
1420 let mut handler_found = false;
1421 for registration in inventory::iter::<CommandHandlerRegistration> {
1422 if registration.command_id == command_id {
1423 handler_found = true;
1424 let executor = (registration.factory)();
1425
1426 let req = Arc::new(RequestContext::from_client(
1427 job.tx_id.clone(),
1428 client_id.clone(),
1429 host_id,
1430 ));
1431 let cmd_id: Arc<str> = Arc::from(command_id.clone());
1432 let cmd_ctx = CommandContext::new(cmd_id, req, ctx.clone());
1433 let execute_started = Instant::now();
1434
1435 match executor.execute_from_value(job.command.clone(), cmd_ctx) {
1436 Ok(result) => {
1437 let response = MykoMessage::CommandResponse(CommandResponse {
1438 response: result,
1439 tx: job.tx_id.to_string(),
1440 });
1441 if let Err(e) = priority_tx.try_send(response) {
1442 drop_logger.on_drop("CommandResponse", &e);
1443 }
1444 }
1445 Err(e) => {
1446 let error = MykoMessage::CommandError(CommandError {
1447 tx: job.tx_id.to_string(),
1448 command_id: command_id.clone(),
1449 message: e.message,
1450 });
1451 if let Err(err) = priority_tx.try_send(error) {
1452 drop_logger.on_drop("CommandError", &err);
1453 }
1454 }
1455 }
1456 let execute_ms = execute_started.elapsed().as_millis();
1457 let total_ms = job.received_at.elapsed().as_millis();
1458 log::trace!(
1459 target: "myko_server::ws_perf",
1460 "command_exec client={} tx={} command_id={} queue_wait_ms={} execute_ms={} total_ms={}",
1461 client_id,
1462 job.tx_id,
1463 command_id,
1464 queue_wait_ms,
1465 execute_ms,
1466 total_ms
1467 );
1468 break;
1469 }
1470 }
1471
1472 if !handler_found {
1473 log::warn!("No registered handler for command: {}", command_id);
1474 let error = MykoMessage::CommandError(CommandError {
1475 tx: job.tx_id.to_string(),
1476 command_id: command_id.clone(),
1477 message: format!("Command handler not found: {}", command_id),
1478 });
1479 if let Err(e) = priority_tx.try_send(error) {
1480 drop_logger.on_drop("CommandError", &e);
1481 }
1482 }
1483
1484 if !handler_found {
1485 log::debug!(
1486 target: "myko_server::ws_perf",
1487 "command_exec client={} tx={} command_id={} queue_wait_ms={} execute_ms=0 total_ms={} handler_found=false",
1488 client_id,
1489 job.tx_id,
1490 command_id,
1491 queue_wait_ms,
1492 job.received_at.elapsed().as_millis()
1493 );
1494 }
1495 }
1496}
1497
1498struct ChannelWriter {
1503 tx: mpsc::Sender<OutboundMessage>,
1504 deferred_tx: mpsc::Sender<DeferredOutbound>,
1505 drop_logger: Arc<DropLogger>,
1506 use_binary_writer: Arc<AtomicBool>,
1507}
1508
1509impl WsWriter for ChannelWriter {
1510 fn send(&self, msg: MykoMessage) {
1511 if let Err(e) = self.tx.try_send(OutboundMessage::Message(msg)) {
1513 self.drop_logger.on_drop("message", &e);
1514 }
1515 }
1516
1517 fn protocol(&self) -> myko::client::MykoProtocol {
1518 if self.use_binary_writer.load(Ordering::SeqCst) {
1519 myko::client::MykoProtocol::MSGPACK
1520 } else {
1521 myko::client::MykoProtocol::JSON
1522 }
1523 }
1524
1525 fn send_serialized_command(
1526 &self,
1527 tx: Arc<str>,
1528 command_id: String,
1529 payload: EncodedCommandMessage,
1530 ) {
1531 if let Err(e) = self.tx.try_send(OutboundMessage::SerializedCommand {
1532 tx,
1533 command_id,
1534 payload,
1535 }) {
1536 self.drop_logger.on_drop("serialized_command", &e);
1537 }
1538 }
1539
1540 fn send_report_response(&self, tx: Arc<str>, output: Arc<dyn AnyOutput>) {
1541 if let Err(e) = self
1542 .deferred_tx
1543 .try_send(DeferredOutbound::Report(tx, output))
1544 {
1545 self.drop_logger.on_drop("ReportResponseDeferred", &e);
1546 }
1547 }
1548
1549 fn send_query_response(&self, response: PendingQueryResponse, is_view: bool) {
1550 if let Err(e) = self
1551 .deferred_tx
1552 .try_send(DeferredOutbound::Query { response, is_view })
1553 {
1554 self.drop_logger.on_drop("QueryResponseDeferred", &e);
1555 }
1556 }
1557}
1558
1559#[cfg(test)]
1560mod tests {
1561 use super::*;
1562
1563 #[test]
1564 fn test_channel_writer() {
1565 let (tx, mut rx) = mpsc::channel(10);
1566 let (deferred_tx, _deferred_rx) = mpsc::channel(10);
1567 let drop_logger = Arc::new(DropLogger::new("test-client".into()));
1568 let writer = ChannelWriter {
1569 tx,
1570 deferred_tx,
1571 drop_logger,
1572 use_binary_writer: Arc::new(AtomicBool::new(false)),
1573 };
1574
1575 let msg = MykoMessage::Ping(PingData {
1576 id: "test".to_string(),
1577 timestamp: 0,
1578 });
1579 writer.send(msg);
1580
1581 let received = rx.try_recv().unwrap();
1582 assert!(matches!(
1583 received,
1584 OutboundMessage::Message(MykoMessage::Ping(_))
1585 ));
1586 }
1587}