1use std::{
6 collections::HashMap,
7 net::SocketAddr,
8 sync::{
9 Arc, Mutex, OnceLock,
10 atomic::{AtomicBool, AtomicU8, AtomicU64, Ordering},
11 },
12 thread,
13 time::{Duration, Instant},
14};
15
16use dashmap::DashMap;
17use futures_util::{SinkExt, StreamExt};
18use hyphae::SelectExt;
19use myko::{
20 WS_MAX_FRAME_SIZE_BYTES, WS_MAX_MESSAGE_SIZE_BYTES,
21 client::MykoProtocol,
22 command::{CommandContext, CommandHandlerRegistration},
23 entities::client::{Client, ClientId},
24 relationship::{
25 iter_client_id_registrations, iter_fallback_to_id_registrations,
26 iter_server_owned_registrations,
27 },
28 report::AnyOutput,
29 request::RequestContext,
30 server::{
31 CellServerCtx, ClientSession, PendingQueryResponse, WsWriter,
32 client_registry::try_client_registry,
33 },
34 wire::{
35 CancelSubscription, CommandError, CommandResponse, EncodedCommandMessage, MEvent,
36 MEventType, MykoMessage, PingData, QueryWindowUpdate, ViewError, ViewWindowUpdate,
37 },
38};
39use tokio::{net::TcpStream, sync::mpsc, time::interval};
40use tokio_tungstenite::{
41 accept_async_with_config,
42 tungstenite::{Message, protocol::WebSocketConfig},
43};
44use uuid::Uuid;
45
46struct WsIngestStats {
47 counts_by_type: DashMap<Arc<str>, Arc<AtomicU64>>,
48}
49
50struct WsBenchmarkStats {
51 message_count: AtomicU64,
52 total_bytes: AtomicU64,
53}
54
55static WS_INGEST_STATS: OnceLock<Arc<WsIngestStats>> = OnceLock::new();
56static WS_INGEST_LOGGER_STARTED: AtomicBool = AtomicBool::new(false);
57static WS_BENCHMARK_STATS: OnceLock<Arc<WsBenchmarkStats>> = OnceLock::new();
58static WS_BENCHMARK_LOGGER_STARTED: AtomicBool = AtomicBool::new(false);
59
60fn ws_benchmark_stats() -> Arc<WsBenchmarkStats> {
61 WS_BENCHMARK_STATS
62 .get_or_init(|| {
63 Arc::new(WsBenchmarkStats {
64 message_count: AtomicU64::new(0),
65 total_bytes: AtomicU64::new(0),
66 })
67 })
68 .clone()
69}
70
71fn ensure_ws_benchmark_logger() {
72 if WS_BENCHMARK_LOGGER_STARTED
73 .compare_exchange(false, true, Ordering::Relaxed, Ordering::Relaxed)
74 .is_err()
75 {
76 return;
77 }
78
79 let stats = ws_benchmark_stats();
80 thread::Builder::new()
81 .name("ws-benchmark-logger".into())
82 .spawn(move || {
83 loop {
84 thread::sleep(Duration::from_secs(1));
85
86 let count = stats.message_count.swap(0, Ordering::Relaxed);
87 let bytes = stats.total_bytes.swap(0, Ordering::Relaxed);
88
89 if count == 0 {
90 continue;
91 }
92
93 log::info!(
94 "WebSocket benchmark last_1s messages={} bytes={} avg_bytes={}",
95 count,
96 bytes,
97 bytes / count
98 );
99 }
100 })
101 .expect("failed to spawn websocket benchmark logger thread");
102}
103
104fn ws_ingest_stats() -> Arc<WsIngestStats> {
105 WS_INGEST_STATS
106 .get_or_init(|| {
107 Arc::new(WsIngestStats {
108 counts_by_type: DashMap::new(),
109 })
110 })
111 .clone()
112}
113
114fn ensure_ws_ingest_logger() {
115 if WS_INGEST_LOGGER_STARTED
116 .compare_exchange(false, true, Ordering::Relaxed, Ordering::Relaxed)
117 .is_err()
118 {
119 return;
120 }
121
122 let stats = ws_ingest_stats();
123 thread::Builder::new()
124 .name("ws-ingest-logger".into())
125 .spawn(move || {
126 loop {
127 thread::sleep(Duration::from_secs(1));
128
129 let mut per_type = Vec::new();
130 let mut total = 0u64;
131 for entry in &stats.counts_by_type {
132 let count = entry.value().swap(0, Ordering::Relaxed);
133 if count == 0 {
134 continue;
135 }
136 total += count;
137 per_type.push((entry.key().clone(), count));
138 }
139
140 if total == 0 {
141 continue;
142 }
143
144 per_type.sort_by(|a, b| b.1.cmp(&a.1).then_with(|| a.0.cmp(&b.0)));
145 let details = per_type
146 .into_iter()
147 .map(|(entity_type, count)| format!("{entity_type}={count}"))
148 .collect::<Vec<_>>()
149 .join(" ");
150
151 log::info!("WebSocket ingest last_1s total={} {}", total, details);
152 }
153 })
154 .expect("failed to spawn websocket ingest logger thread");
155}
156
157fn record_ws_ingest(events: &[MEvent]) {
158 if events.is_empty() {
159 return;
160 }
161
162 let stats = ws_ingest_stats();
163 for event in events {
164 let counter = stats
165 .counts_by_type
166 .entry(Arc::<str>::from(event.item_type.as_str()))
167 .or_insert_with(|| Arc::new(AtomicU64::new(0)))
168 .clone();
169 counter.fetch_add(1, Ordering::Relaxed);
170 }
171}
172
173fn normalize_incoming_event(event: &mut MEvent, client_id: &str, host_id: uuid::Uuid) {
174 if event.change_type != MEventType::SET {
175 return;
176 }
177
178 for reg in iter_client_id_registrations() {
180 if reg.entity_type == event.item_type {
181 if let Some(obj) = event.item.as_object_mut() {
182 obj.insert(
183 reg.field_name_json.to_string(),
184 serde_json::Value::String(client_id.to_string()),
185 );
186 }
187 break;
188 }
189 }
190
191 for reg in iter_server_owned_registrations() {
193 if reg.entity_type == event.item_type {
194 if let Some(obj) = event.item.as_object_mut() {
195 let field = reg.field_name_json;
196 let current = obj.get(field).and_then(|v| v.as_str()).unwrap_or("");
197 if current.is_empty() {
198 obj.insert(
199 field.to_string(),
200 serde_json::Value::String(host_id.to_string()),
201 );
202 }
203 }
204 break;
205 }
206 }
207
208 if let Some(obj) = event.item.as_object_mut()
211 && let Some(id) = obj.get("id").and_then(|v| v.as_str()).map(String::from)
212 {
213 for reg in iter_fallback_to_id_registrations() {
214 if reg.entity_type == event.item_type {
215 let field = reg.field_name_json;
216 if matches!(obj.get(field), None | Some(serde_json::Value::Null)) {
217 obj.insert(field.to_string(), serde_json::Value::String(id.clone()));
218 }
219 }
220 }
221 }
222}
223
224struct DropLogger {
229 client_id: Arc<str>,
230 dropped: std::sync::atomic::AtomicU64,
231 last_log_ms: std::sync::atomic::AtomicU64,
232}
233
234impl DropLogger {
235 fn new(client_id: Arc<str>) -> Self {
236 Self {
237 client_id,
238 dropped: std::sync::atomic::AtomicU64::new(0),
239 last_log_ms: std::sync::atomic::AtomicU64::new(0),
240 }
241 }
242
243 fn on_drop(&self, kind: &'static str, err: &dyn std::fmt::Display) {
244 use std::sync::atomic::Ordering;
245
246 self.dropped.fetch_add(1, Ordering::Relaxed);
247
248 let now_ms = std::time::SystemTime::now()
250 .duration_since(std::time::UNIX_EPOCH)
251 .unwrap_or_default()
252 .as_millis() as u64;
253 let last_ms = self.last_log_ms.load(Ordering::Relaxed);
254 if now_ms.saturating_sub(last_ms) < 1000 {
255 return;
256 }
257
258 if self
259 .last_log_ms
260 .compare_exchange(last_ms, now_ms, Ordering::Relaxed, Ordering::Relaxed)
261 .is_err()
262 {
263 return;
264 }
265
266 let n = self.dropped.swap(0, Ordering::Relaxed);
267 log::warn!(
268 "WebSocket send buffer full; dropped {} message(s) for client {} (latest: {}): {}",
269 n,
270 self.client_id,
271 kind,
272 err
273 );
274 }
275}
276
277struct CommandJob {
278 tx_id: Arc<str>,
279 command_id: String,
280 command: serde_json::Value,
281 received_at: Instant,
282}
283
284enum SubscriptionReady {
286 Query {
287 tx_id: Arc<str>,
288 cellmap: hyphae::CellMap<Arc<str>, Arc<dyn myko::item::AnyItem>, hyphae::CellImmutable>,
289 window: Option<myko::wire::QueryWindow>,
290 },
291 View {
292 tx_id: Arc<str>,
293 view_id: Arc<str>,
294 cellmap: hyphae::CellMap<Arc<str>, Arc<dyn myko::item::AnyItem>, hyphae::CellImmutable>,
295 window: Option<myko::wire::QueryWindow>,
296 },
297}
298
299enum OutboundMessage {
300 Message(MykoMessage),
301 SerializedCommand {
302 tx: Arc<str>,
303 command_id: String,
304 payload: EncodedCommandMessage,
305 },
306}
307
308enum DeferredOutbound {
309 Report(Arc<str>, Arc<dyn AnyOutput>),
310 Query {
311 response: PendingQueryResponse,
312 is_view: bool,
313 },
314}
315
316pub struct WsHandler;
318
319impl WsHandler {
320 #[allow(clippy::too_many_arguments)]
322 pub async fn handle_connection(
323 stream: TcpStream,
324 addr: SocketAddr,
325 ctx: Arc<CellServerCtx>,
326 ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
327 ensure_ws_ingest_logger();
328
329 let host_id = ctx.host_id;
330
331 let mut ws_config = WebSocketConfig::default();
332 ws_config.max_message_size = Some(WS_MAX_MESSAGE_SIZE_BYTES);
333 ws_config.max_frame_size = Some(WS_MAX_FRAME_SIZE_BYTES);
334 let ws_stream = accept_async_with_config(stream, Some(ws_config)).await?;
335 let (mut write, mut read) = ws_stream.split();
336
337 let (tx, mut rx) = mpsc::channel::<OutboundMessage>(10_000);
340 let (deferred_tx, mut deferred_rx) = mpsc::channel::<DeferredOutbound>(10_000);
341 let (priority_tx, mut priority_rx) = mpsc::channel::<MykoMessage>(1_000);
342 let (command_tx, mut command_rx) = mpsc::unbounded_channel::<CommandJob>();
343 let (subscribe_tx, mut subscribe_rx) = mpsc::unbounded_channel::<SubscriptionReady>();
344
345 let outgoing_format = Arc::new(AtomicU8::new(MykoProtocol::JSON as u8));
348
349 let client_id: Arc<str> = Uuid::new_v4().to_string().into();
351 let drop_logger = Arc::new(DropLogger::new(client_id.clone()));
352 let writer = ChannelWriter {
353 tx: tx.clone(),
354 deferred_tx: deferred_tx.clone(),
355 drop_logger: drop_logger.clone(),
356 outgoing_format: outgoing_format.clone(),
357 };
358
359 let writer_arc: Arc<dyn WsWriter> = Arc::new(ChannelWriter {
361 tx: tx.clone(),
362 deferred_tx: deferred_tx.clone(),
363 drop_logger: drop_logger.clone(),
364 outgoing_format: outgoing_format.clone(),
365 });
366 if let Some(registry) = try_client_registry() {
367 registry.register(client_id.clone(), writer_arc);
368 }
369
370 let mut session = ClientSession::new(client_id.clone(), writer);
371
372 let outgoing_format_writer = outgoing_format.clone();
373 let query_ids_by_tx: Arc<Mutex<HashMap<Arc<str>, Arc<str>>>> =
374 Arc::new(Mutex::new(HashMap::new()));
375 let view_ids_by_tx: Arc<Mutex<HashMap<Arc<str>, Arc<str>>>> =
376 Arc::new(Mutex::new(HashMap::new()));
377 let subscribe_started_by_tx: Arc<Mutex<HashMap<Arc<str>, Instant>>> =
378 Arc::new(Mutex::new(HashMap::new()));
379 let command_started_by_tx: Arc<Mutex<HashMap<Arc<str>, Instant>>> =
380 Arc::new(Mutex::new(HashMap::new()));
381 let outbound_commands_by_tx: Arc<Mutex<HashMap<String, (String, Instant)>>> =
382 Arc::new(Mutex::new(HashMap::new()));
383
384 let outbound_commands_by_tx_writer = outbound_commands_by_tx.clone();
385
386 let client_entity = Client {
388 id: ClientId(client_id.clone()),
389 server_id: host_id.to_string().into(),
390 address: Some(Arc::from(addr.to_string())),
391 windback: None,
392 };
393 if let Err(e) = ctx.set(&client_entity) {
394 log::error!("Failed to persist client entity: {e}");
395 }
396
397 log::info!("Client connected: {} from {}", client_id, addr);
398
399 let write_ctx = ctx.clone();
400 let write_client_id = client_id.clone();
401 let write_addr = addr;
402 let command_ctx = ctx.clone();
403 let command_priority_tx = priority_tx.clone();
404 let command_drop_logger = drop_logger.clone();
405 let command_client_id = client_id.clone();
406
407 let write_task = tokio::spawn(async move {
409 let _ctx = write_ctx;
410 let mut normal_open = true;
411 let mut priority_open = true;
412 let mut deferred_open = true;
413 while normal_open || priority_open || deferred_open {
414 let msg = tokio::select! {
415 biased;
416 maybe = priority_rx.recv(), if priority_open => {
417 match maybe {
418 Some(msg) => OutboundMessage::Message(msg),
419 None => {
420 priority_open = false;
421 continue;
422 }
423 }
424 }
425 maybe = deferred_rx.recv(), if deferred_open => {
426 match maybe {
427 Some(DeferredOutbound::Report(tx, output)) => {
428 OutboundMessage::Message(MykoMessage::ReportResponse(myko::wire::ReportResponse {
429 response: output.to_value(),
430 tx: tx.to_string(),
431 }))
432 }
433 Some(DeferredOutbound::Query { response, is_view }) => {
434 if is_view {
435 OutboundMessage::Message(MykoMessage::ViewResponse(response.into_wire()))
436 } else {
437 OutboundMessage::Message(MykoMessage::QueryResponse(response.into_wire()))
438 }
439 }
440 None => {
441 deferred_open = false;
442 continue;
443 }
444 }
445 }
446 maybe = rx.recv(), if normal_open => {
447 match maybe {
448 Some(msg) => msg,
449 None => {
450 normal_open = false;
451 continue;
452 }
453 }
454 }
455 };
456 match &msg {
460 OutboundMessage::SerializedCommand { .. } => {
461 crate::ws_timing::record_outbound("Command")
462 }
463 OutboundMessage::Message(m) => {
464 crate::ws_timing::record_outbound(crate::ws_timing::message_kind(m))
465 }
466 };
467 let (kind, tx_id, seq, _upserts, _deletes, _total_count) = match &msg {
468 OutboundMessage::SerializedCommand { tx, .. } => {
469 ("command", Some(tx.clone()), None, None, None, None)
470 }
471 OutboundMessage::Message(msg) => match msg {
472 MykoMessage::ViewResponse(r) => (
473 "view_response",
474 Some(r.tx.clone()),
475 Some(r.sequence),
476 Some(r.upserts.len()),
477 Some(r.deletes.len()),
478 r.total_count,
479 ),
480 MykoMessage::QueryResponse(r) => (
481 "query_response",
482 Some(r.tx.clone()),
483 Some(r.sequence),
484 Some(r.upserts.len()),
485 Some(r.deletes.len()),
486 r.total_count,
487 ),
488 MykoMessage::CommandResponse(r) => (
489 "command_response",
490 Some(Arc::<str>::from(r.tx.clone())),
491 None,
492 None,
493 None,
494 None,
495 ),
496 MykoMessage::CommandError(r) => (
497 "command_error",
498 Some(Arc::<str>::from(r.tx.clone())),
499 None,
500 None,
501 None,
502 None,
503 ),
504 _ => ("other", None, None, None, None, None),
505 },
506 };
507
508 match &msg {
509 OutboundMessage::SerializedCommand { tx, command_id, .. } => {
510 if !tx.trim().is_empty()
511 && let Ok(mut map) = outbound_commands_by_tx_writer.lock()
512 {
513 map.insert(tx.to_string(), (command_id.clone(), Instant::now()));
514 }
515 }
516 OutboundMessage::Message(MykoMessage::Command(wrapped)) => {
517 if let Some(tx_id) = wrapped.command.get("tx").and_then(|v| v.as_str())
518 && !tx_id.trim().is_empty()
519 && let Ok(mut map) = outbound_commands_by_tx_writer.lock()
520 {
521 map.insert(
522 tx_id.to_string(),
523 (wrapped.command_id.clone(), Instant::now()),
524 );
525 }
526 }
527 _ => {}
528 }
529
530 let ws_msg = match &msg {
531 OutboundMessage::SerializedCommand {
532 payload: EncodedCommandMessage::Json(json),
533 ..
534 } => Message::Text(json.clone().into()),
535 OutboundMessage::SerializedCommand {
536 payload: EncodedCommandMessage::Cbor(bytes),
537 ..
538 } => Message::Binary(bytes.clone().into()),
539 OutboundMessage::Message(msg)
540 if outgoing_format_writer.load(Ordering::SeqCst)
541 == MykoProtocol::CBOR as u8 =>
542 {
543 let mut bytes = Vec::new();
544 match ciborium::ser::into_writer(msg, &mut bytes) {
545 Ok(()) => Message::Binary(bytes.into()),
546 Err(e) => {
547 log::error!("Failed to serialize message to CBOR: {}", e);
548 continue;
549 }
550 }
551 }
552 OutboundMessage::Message(msg) => match serde_json::to_string(msg) {
553 Ok(json) => Message::Text(json.into()),
554 Err(e) => {
555 log::error!("Failed to serialize message to JSON: {}", e);
556 continue;
557 }
558 },
559 };
560 let payload_bytes = match &ws_msg {
561 Message::Binary(b) => b.len(),
562 Message::Text(t) => t.len(),
563 _ => 0,
564 };
565
566 if let Err(err) = write.send(ws_msg).await {
567 log::error!(
568 "WebSocket write failed for client {} from {} kind={} tx={:?} seq={:?} payload_bytes={} binary={}: {}",
569 write_client_id,
570 write_addr,
571 kind,
572 tx_id,
573 seq,
574 payload_bytes,
575 outgoing_format_writer.load(Ordering::SeqCst) == MykoProtocol::CBOR as u8,
576 err
577 );
578 break;
579 }
580 }
581 if let Some(registry) = try_client_registry() {
584 registry.unregister(&write_client_id);
585 log::info!(
586 "WebSocket writer unregistered client {} from {} (write task exiting)",
587 write_client_id,
588 write_addr,
589 );
590 }
591 log::warn!(
592 "WebSocket writer task exiting for client {} from {} normal_open={} priority_open={} deferred_open={}",
593 write_client_id,
594 write_addr,
595 normal_open,
596 priority_open,
597 deferred_open
598 );
599 });
600
601 let command_started_cleanup = command_started_by_tx.clone();
604 let command_task = tokio::spawn(async move {
605 while let Some(job) = command_rx.recv().await {
606 let command_ctx = command_ctx.clone();
607 let command_priority_tx = command_priority_tx.clone();
608 let command_drop_logger = command_drop_logger.clone();
609 let command_client_id = command_client_id.clone();
610 let tx_id = job.tx_id.clone();
611 let started_map = command_started_cleanup.clone();
612 match tokio::task::spawn_blocking(move || {
613 Self::execute_command_job(
614 command_ctx,
615 &command_priority_tx,
616 command_drop_logger.as_ref(),
617 command_client_id,
618 job,
619 );
620 })
621 .await
622 {
623 Ok(()) => {}
624 Err(e) => {
625 log::error!("Command worker panicked: {}", e);
626 }
627 }
628 if let Ok(mut map) = started_map.lock() {
630 map.remove(&tx_id);
631 }
632 }
633 });
634
635 let mut outbound_ttl_interval = interval(Duration::from_secs(10));
639 outbound_ttl_interval.tick().await; loop {
641 tokio::select! {
642 Some(ready) = subscribe_rx.recv() => {
644 let tx_id = match &ready {
645 SubscriptionReady::Query { tx_id, .. } => tx_id.clone(),
646 SubscriptionReady::View { tx_id, .. } => tx_id.clone(),
647 };
648 if let Ok(mut map) = subscribe_started_by_tx.lock() {
649 map.remove(&tx_id);
650 }
651 match ready {
652 SubscriptionReady::Query { tx_id, cellmap, window } => {
653 session.subscribe_query(tx_id, cellmap, window);
654 }
655 SubscriptionReady::View { tx_id, view_id, cellmap, window } => {
656 session.subscribe_view_with_id(tx_id, view_id, cellmap, window);
657 }
658 }
659 }
660 _ = outbound_ttl_interval.tick() => {
664 if let Ok(mut map) = outbound_commands_by_tx.lock() {
665 let before = map.len();
666 map.retain(|_, (_, started)| started.elapsed() < Duration::from_secs(10));
667 let removed = before - map.len();
668 if removed > 0 {
669 log::debug!(
670 "Outbound command TTL sweep client={}: removed {} stale entries, {} remaining",
671 session.client_id,
672 removed,
673 map.len()
674 );
675 }
676 }
677 }
678 msg = read.next() => {
680 let Some(msg) = msg else { break };
681 let ctx = ctx.clone();
682 let msg = match msg {
683 Ok(m) => m,
684 Err(e) => {
685 log::error!("WebSocket read error from {}: {}", client_id, e);
686 break;
687 }
688 };
689
690 match msg {
691 Message::Binary(data) => {
692 if outgoing_format.load(Ordering::SeqCst) != MykoProtocol::CBOR as u8 {
693 log::debug!(
694 "Client {} promoted outgoing format to CBOR via demonstration",
695 client_id
696 );
697 outgoing_format.store(MykoProtocol::CBOR as u8, Ordering::SeqCst);
698 }
699
700 match ciborium::de::from_reader::<MykoMessage, _>(data.as_ref()) {
701 Ok(myko_msg) => {
702 if let Err(e) = Self::handle_message(
703 &mut session,
704 ctx,
705 &priority_tx,
706 &drop_logger,
707 &query_ids_by_tx,
708 &view_ids_by_tx,
709 &subscribe_started_by_tx,
710 &command_started_by_tx,
711 &outbound_commands_by_tx,
712 &command_tx,
713 &subscribe_tx,
714 myko_msg,
715 ) {
716 log::error!("Error handling message: {}", e);
717 }
718 tokio::task::yield_now().await;
719 }
720 Err(e) => {
721 log::warn!("Failed to parse message from {}: {}", client_id, e);
722 }
723 }
724 }
725 Message::Text(text) => {
726 match serde_json::from_str::<MykoMessage>(&text) {
727 Ok(myko_msg) => {
728 if let Err(e) = Self::handle_message(
729 &mut session,
730 ctx,
731 &priority_tx,
732 &drop_logger,
733 &query_ids_by_tx,
734 &view_ids_by_tx,
735 &subscribe_started_by_tx,
736 &command_started_by_tx,
737 &outbound_commands_by_tx,
738 &command_tx,
739 &subscribe_tx,
740 myko_msg,
741 ) {
742 log::error!("Error handling message: {}", e);
743 }
744 tokio::task::yield_now().await;
745 }
746 Err(e) => {
747 log::warn!(
748 "Failed to parse JSON message from {}: {} | raw: {}",
749 client_id,
750 e,
751 if text.len() > 1000 {
752 &text[..1000]
753 } else {
754 &text
755 }
756 );
757 }
758 }
759 }
760 Message::Ping(data) => {
761 log::trace!("Ping from {}", client_id);
762 let _ = data;
763 }
764 Message::Pong(_) => {
765 log::trace!("Pong from {}", client_id);
766 }
767 Message::Close(frame) => {
768 log::warn!("Client {} sent close frame: {:?}", client_id, frame);
769 break;
770 }
771 Message::Frame(_) => {}
772 }
773 }
774 }
775 }
776
777 write_task.abort();
793 command_task.abort();
794 if let Some(registry) = try_client_registry() {
795 registry.unregister(&client_id);
796 }
797
798 let drop_client_id = client_id.clone();
799 tokio::task::spawn_blocking(move || {
800 drop(session); log::trace!(
802 "Client session subscriptions torn down for {}",
803 drop_client_id
804 );
805 });
806
807 if let Err(e) = ctx.del(&client_entity) {
809 log::error!("Failed to delete client entity: {e}");
810 }
811
812 log::info!("Client disconnected: {} from {}", client_id, addr);
813
814 Ok(())
815 }
816
817 #[allow(clippy::too_many_arguments)]
819 fn handle_message<W: WsWriter>(
820 session: &mut ClientSession<W>,
821 ctx: Arc<CellServerCtx>,
822 priority_tx: &mpsc::Sender<MykoMessage>,
823 drop_logger: &Arc<DropLogger>,
824 query_ids_by_tx: &Arc<Mutex<HashMap<Arc<str>, Arc<str>>>>,
825 view_ids_by_tx: &Arc<Mutex<HashMap<Arc<str>, Arc<str>>>>,
826 subscribe_started_by_tx: &Arc<Mutex<HashMap<Arc<str>, Instant>>>,
827 command_started_by_tx: &Arc<Mutex<HashMap<Arc<str>, Instant>>>,
828 outbound_commands_by_tx: &Arc<Mutex<HashMap<String, (String, Instant)>>>,
829 command_tx: &mpsc::UnboundedSender<CommandJob>,
830 subscribe_tx: &mpsc::UnboundedSender<SubscriptionReady>,
831 msg: MykoMessage,
832 ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
833 crate::ws_timing::record_inbound(crate::ws_timing::message_kind(&msg));
836
837 let handler_registry = ctx.handler_registry.clone();
838
839 let registry = ctx.registry.clone();
840
841 let host_id = ctx.host_id;
842
843 match msg {
844 MykoMessage::Query(wrapped) => {
845 let tx_id: Arc<str> = wrapped
847 .query
848 .get("tx")
849 .and_then(|v| v.as_str())
850 .unwrap_or("unknown")
851 .into();
852 let query_id = &wrapped.query_id;
853 let entity_type = &wrapped.query_item_type;
854
855 if session.has_subscription(&tx_id) {
858 log::debug!(
859 "Ignoring duplicate query subscribe client={} tx={} query_id={} item_type={}",
860 session.client_id,
861 tx_id,
862 query_id,
863 entity_type
864 );
865 return Ok(());
866 }
867 if let Ok(mut map) = query_ids_by_tx.lock() {
868 map.insert(tx_id.clone(), query_id.clone());
869 }
870 if let Ok(mut map) = subscribe_started_by_tx.lock() {
871 map.entry(tx_id.clone()).or_insert_with(Instant::now);
872 }
873
874 log::trace!("Query {} for {} (tx: {})", query_id, entity_type, tx_id);
875 log::trace!(
876 "Query subscribe request client={} tx={} query_id={} item_type={} window={} active_subscriptions_before={}",
877 session.client_id,
878 tx_id,
879 query_id,
880 entity_type,
881 wrapped.window.is_some(),
882 session.subscription_count()
883 );
884
885 let request_context = Arc::new(RequestContext::from_client(
886 tx_id.clone(),
887 session.client_id.clone(),
888 host_id,
889 ));
890
891 if let Some(query_data) = handler_registry.get_query(query_id) {
892 let parsed = (query_data.parse)(wrapped.query.clone());
893 match parsed {
894 Ok(any_query) => {
895 let cell_factory = query_data.cell_factory;
898 let registry = registry.clone();
899 let request_context = request_context.clone();
900 let ctx = ctx.clone();
901 let window = wrapped.window.clone();
902 let query_id = query_id.clone();
903 let sub_tx = subscribe_tx.clone();
904 tokio::task::spawn_blocking(move || {
905 match cell_factory(any_query, registry, request_context, Some(ctx))
906 {
907 Ok(filtered_cellmap) => {
908 let _ = sub_tx.send(SubscriptionReady::Query {
909 tx_id,
910 cellmap: filtered_cellmap,
911 window,
912 });
913 }
914 Err(e) => {
915 log::error!(
916 "Failed to create query cell for {}: {}",
917 query_id,
918 e
919 );
920 }
921 }
922 });
923 }
924 Err(e) => {
925 log::error!(
926 "Failed to parse query {}: {} | payload: {}",
927 query_id,
928 e,
929 serde_json::to_string(&wrapped.query).unwrap_or_default()
930 );
931 }
932 }
933 } else {
934 log::warn!(
936 "No registered query handler for {}, falling back to select all",
937 query_id
938 );
939 let store: myko::store::EntityStore =
940 (*registry.get_or_create(entity_type)).clone();
941 let cellmap = hyphae::MapQuery::materialize(store.select(|_| true));
942 session.subscribe_query(tx_id, cellmap, wrapped.window.clone());
943 }
944 }
945
946 MykoMessage::View(wrapped) => {
947 let tx_id: Arc<str> = wrapped
948 .view
949 .get("tx")
950 .and_then(|v| v.as_str())
951 .unwrap_or("unknown")
952 .into();
953 let view_id = &wrapped.view_id;
954 let item_type = &wrapped.view_item_type;
955
956 if session.has_subscription(&tx_id) {
958 log::debug!(
959 "Ignoring duplicate view subscribe client={} tx={} view_id={} item_type={}",
960 session.client_id,
961 tx_id,
962 view_id,
963 item_type
964 );
965 return Ok(());
966 }
967 if let Ok(mut map) = view_ids_by_tx.lock() {
968 map.insert(tx_id.clone(), view_id.clone());
969 }
970 if let Ok(mut map) = subscribe_started_by_tx.lock() {
971 map.entry(tx_id.clone()).or_insert_with(Instant::now);
972 }
973
974 log::trace!("View {} for {} (tx: {})", view_id, item_type, tx_id);
975 log::trace!(
976 "View subscribe request client={} tx={} view_id={} item_type={} window={:?}",
977 session.client_id,
978 tx_id,
979 view_id,
980 item_type,
981 wrapped.window
982 );
983
984 let request_context = Arc::new(RequestContext::from_client(
985 tx_id.clone(),
986 session.client_id.clone(),
987 host_id,
988 ));
989
990 if let Some(view_data) = handler_registry.get_view(view_id) {
991 let parsed = (view_data.parse)(wrapped.view.clone());
992 match parsed {
993 Ok(any_view) => {
994 log::trace!(
995 "View parsed successfully client={} tx={} view_id={}",
996 session.client_id,
997 tx_id,
998 view_id
999 );
1000 let cell_factory = view_data.cell_factory;
1003 let registry = registry.clone();
1004 let ctx = ctx.clone();
1005 let window = wrapped.window.clone();
1006 let view_id_clone = view_id.clone();
1007 let sub_tx = subscribe_tx.clone();
1008 let priority_tx = priority_tx.clone();
1009 let drop_logger = drop_logger.clone();
1010 tokio::task::spawn_blocking(move || {
1011 match cell_factory(any_view, registry, request_context, ctx) {
1012 Ok(filtered_cellmap) => {
1013 log::trace!(
1014 "View cell factory succeeded tx={} view_id={}",
1015 tx_id,
1016 view_id_clone
1017 );
1018 let _ = sub_tx.send(SubscriptionReady::View {
1019 tx_id,
1020 view_id: view_id_clone,
1021 cellmap: filtered_cellmap,
1022 window,
1023 });
1024 }
1025 Err(e) => {
1026 log::error!(
1027 "Failed to create view cell for {}: {}",
1028 view_id_clone,
1029 e
1030 );
1031 if let Err(err) = priority_tx.try_send(
1032 MykoMessage::ViewError(ViewError {
1033 tx: tx_id.to_string(),
1034 view_id: view_id_clone.to_string(),
1035 message: e,
1036 }),
1037 ) {
1038 drop_logger.on_drop("ViewError", &err);
1039 }
1040 }
1041 }
1042 });
1043 }
1044 Err(e) => {
1045 let message = format!("Failed to parse view {}: {}", view_id, e);
1046 log::error!(
1047 "{} | payload: {}",
1048 message,
1049 serde_json::to_string(&wrapped.view).unwrap_or_default()
1050 );
1051 if let Err(err) =
1052 priority_tx.try_send(MykoMessage::ViewError(ViewError {
1053 tx: tx_id.to_string(),
1054 view_id: view_id.to_string(),
1055 message,
1056 }))
1057 {
1058 drop_logger.on_drop("ViewError", &err);
1059 }
1060 }
1061 }
1062 } else {
1063 let message = format!("No registered handler for view: {}", view_id);
1064 log::warn!("{}", message);
1065 if let Err(err) = priority_tx.try_send(MykoMessage::ViewError(ViewError {
1066 tx: tx_id.to_string(),
1067 view_id: view_id.to_string(),
1068 message,
1069 })) {
1070 drop_logger.on_drop("ViewError", &err);
1071 }
1072 }
1073 }
1074
1075 MykoMessage::QueryCancel(CancelSubscription { tx: tx_id }) => {
1076 log::trace!(
1077 "QueryCancel received: client={} tx={}",
1078 session.client_id,
1079 tx_id
1080 );
1081 let tx_id: Arc<str> = tx_id.into();
1082 if let Ok(mut map) = query_ids_by_tx.lock() {
1083 map.remove(&tx_id);
1084 }
1085 if let Ok(mut map) = subscribe_started_by_tx.lock() {
1086 map.remove(&tx_id);
1087 }
1088 session.cancel(&tx_id);
1089 }
1090
1091 MykoMessage::QueryWindow(QueryWindowUpdate { tx, window }) => {
1092 let tx_id: Arc<str> = tx.into();
1093 log::trace!(
1094 "Query window request client={} tx={} has_window={} active_subscriptions={}",
1095 session.client_id,
1096 tx_id,
1097 window.is_some(),
1098 session.subscription_count()
1099 );
1100 session.update_query_window(&tx_id, window);
1101 }
1102 MykoMessage::ViewCancel(CancelSubscription { tx: tx_id }) => {
1103 log::trace!("View cancel: {}", tx_id);
1104 let tx_id: Arc<str> = tx_id.into();
1105 if let Ok(mut map) = view_ids_by_tx.lock() {
1106 map.remove(&tx_id);
1107 }
1108 if let Ok(mut map) = subscribe_started_by_tx.lock() {
1109 map.remove(&tx_id);
1110 }
1111 session.cancel(&tx_id);
1112 }
1113 MykoMessage::ViewWindow(ViewWindowUpdate { tx, window }) => {
1114 let tx_id: Arc<str> = tx.into();
1115 log::trace!("View window update: {}", tx_id);
1116 session.update_view_window(&tx_id, window);
1117 }
1118
1119 MykoMessage::Report(wrapped) => {
1120 let tx_id: Arc<str> = wrapped
1122 .report
1123 .get("tx")
1124 .and_then(|v| v.as_str())
1125 .unwrap_or("unknown")
1126 .into();
1127 let report_id = &wrapped.report_id;
1128
1129 log::trace!(
1130 "Report subscribe request client={} tx={} report_id={} active_subscriptions_before={}",
1131 session.client_id,
1132 tx_id,
1133 report_id,
1134 session.subscription_count()
1135 );
1136
1137 if let Some(report_data) = handler_registry.get_report(report_id) {
1139 let parsed = (report_data.parse)(wrapped.report.clone());
1141 match parsed {
1142 Ok(any_report) => {
1143 let request_context = Arc::new(RequestContext::from_client(
1144 tx_id.clone(),
1145 session.client_id.clone(),
1146 host_id,
1147 ));
1148
1149 match (report_data.cell_factory)(any_report, request_context, ctx) {
1151 Ok(cell) => {
1152 session.subscribe_report(
1153 tx_id,
1154 report_id.as_str().into(),
1155 cell,
1156 );
1157 }
1158 Err(e) => {
1159 log::error!(
1160 "Failed to create report cell for {}: {}",
1161 report_id,
1162 e
1163 );
1164 }
1165 }
1166 }
1167 Err(e) => {
1168 log::error!(
1169 "Failed to parse report {}: {} | payload: {}",
1170 report_id,
1171 e,
1172 serde_json::to_string(&wrapped.report).unwrap_or_default()
1173 );
1174 }
1175 }
1176 } else {
1177 log::warn!("No registered handler for report: {}", report_id);
1178 }
1179 }
1180
1181 MykoMessage::ReportCancel(CancelSubscription { tx: tx_id }) => {
1182 log::trace!(
1183 "ReportCancel received: client={} tx={} active_subscriptions_before={}",
1184 session.client_id,
1185 tx_id,
1186 session.subscription_count()
1187 );
1188 session.cancel(&tx_id.into());
1189 }
1190
1191 MykoMessage::Event(mut event) => {
1192 record_ws_ingest(std::slice::from_ref(&event));
1193 event.sanitize_null_bytes();
1194 normalize_incoming_event(&mut event, &session.client_id, host_id);
1195 if let Err(e) = ctx.apply_event(event) {
1196 log::error!(
1197 "Failed to apply event from client {}: {e}",
1198 session.client_id
1199 );
1200 }
1201 }
1202
1203 MykoMessage::EventBatch(mut events) => {
1204 record_ws_ingest(&events);
1205 let incoming = events.len();
1206 if incoming >= 64 {
1207 log::trace!(
1208 "Received event batch from client {} size={}",
1209 session.client_id,
1210 incoming
1211 );
1212 }
1213 for event in &mut events {
1214 event.sanitize_null_bytes();
1215 normalize_incoming_event(event, &session.client_id, host_id);
1216 }
1217 match ctx.apply_event_batch(events) {
1218 Ok(applied) => {
1219 log::trace!(
1220 "Applied event batch from client {} size={}",
1221 session.client_id,
1222 applied
1223 );
1224 }
1225 Err(e) => {
1226 log::error!(
1227 "Failed to apply event batch from client {}: {}",
1228 session.client_id,
1229 e
1230 );
1231 }
1232 }
1233 }
1234
1235 MykoMessage::Command(wrapped) => {
1236 let tx_id: Arc<str> = wrapped
1238 .command
1239 .get("tx")
1240 .and_then(|v| v.as_str())
1241 .unwrap_or("unknown")
1242 .into();
1243
1244 let command_id = &wrapped.command_id;
1245
1246 log::trace!("Command {} (tx: {})", command_id, tx_id,);
1247 let received_at = Instant::now();
1248 if let Ok(mut map) = command_started_by_tx.lock() {
1249 map.insert(tx_id.clone(), received_at);
1250 }
1251 if let Err(e) = command_tx.send(CommandJob {
1252 tx_id: tx_id.clone(),
1253 command_id: wrapped.command_id.clone(),
1254 command: wrapped.command.clone(),
1255 received_at,
1256 }) {
1257 log::error!(
1258 "Failed to enqueue command {} for client {} tx={}: {}",
1259 command_id,
1260 session.client_id,
1261 tx_id,
1262 e
1263 );
1264 let error = MykoMessage::CommandError(CommandError {
1265 tx: tx_id.to_string(),
1266 command_id: command_id.to_string(),
1267 message: "Command queue unavailable".to_string(),
1268 });
1269 if let Err(err) = priority_tx.try_send(error) {
1270 drop_logger.on_drop("CommandError", &err);
1271 }
1272 }
1273 }
1274
1275 MykoMessage::Ping(PingData { id, timestamp }) => {
1276 let pong = MykoMessage::Ping(PingData { id, timestamp });
1278 if let Err(e) = priority_tx.try_send(pong) {
1279 drop_logger.on_drop("Ping", &e);
1280 }
1281 }
1282
1283 MykoMessage::QueryResponse(resp) => {
1285 log::warn!(
1286 "Unexpected client message kind=query_response client={} tx={} seq={} upserts={} deletes={} active_subscriptions={}",
1287 session.client_id,
1288 resp.tx,
1289 resp.sequence,
1290 resp.upserts.len(),
1291 resp.deletes.len(),
1292 session.subscription_count()
1293 );
1294 }
1295 MykoMessage::QueryError(err) => {
1296 log::warn!(
1297 "Unexpected client message kind=query_error client={} tx={} query_id={} message={} active_subscriptions={}",
1298 session.client_id,
1299 err.tx,
1300 err.query_id,
1301 err.message,
1302 session.subscription_count()
1303 );
1304 }
1305 MykoMessage::ViewResponse(resp) => {
1306 log::warn!(
1307 "Unexpected client message kind=view_response client={} tx={} seq={} upserts={} deletes={} active_subscriptions={}",
1308 session.client_id,
1309 resp.tx,
1310 resp.sequence,
1311 resp.upserts.len(),
1312 resp.deletes.len(),
1313 session.subscription_count()
1314 );
1315 }
1316 MykoMessage::ViewError(err) => {
1317 log::warn!(
1318 "Unexpected client message kind=view_error client={} tx={} view_id={} message={} active_subscriptions={}",
1319 session.client_id,
1320 err.tx,
1321 err.view_id,
1322 err.message,
1323 session.subscription_count()
1324 );
1325 }
1326 MykoMessage::ReportResponse(resp) => {
1327 log::warn!(
1328 "Unexpected client message kind=report_response client={} tx={} active_subscriptions={}",
1329 session.client_id,
1330 resp.tx,
1331 session.subscription_count()
1332 );
1333 }
1334 MykoMessage::ReportError(err) => {
1335 log::warn!(
1336 "Unexpected client message kind=report_error client={} tx={} report_id={} message={} active_subscriptions={}",
1337 session.client_id,
1338 err.tx,
1339 err.report_id,
1340 err.message,
1341 session.subscription_count()
1342 );
1343 }
1344 MykoMessage::CommandResponse(resp) => {
1345 if resp.tx.trim().is_empty() {
1346 log::warn!(
1347 "Malformed client message kind=command_response client={} tx=<empty> active_subscriptions={}",
1348 session.client_id,
1349 session.subscription_count()
1350 );
1351 } else {
1352 let correlated = outbound_commands_by_tx
1353 .lock()
1354 .ok()
1355 .and_then(|mut map| map.remove(&resp.tx));
1356 if let Some((command_id, started)) = correlated {
1357 log::debug!(
1358 "Client command response matched outbound command client={} tx={} command_id={} roundtrip_ms={} active_subscriptions={}",
1359 session.client_id,
1360 resp.tx,
1361 command_id,
1362 started.elapsed().as_millis(),
1363 session.subscription_count()
1364 );
1365 } else {
1366 log::warn!(
1367 "Client command response without outbound match client={} tx={} active_subscriptions={}",
1368 session.client_id,
1369 resp.tx,
1370 session.subscription_count()
1371 );
1372 }
1373 }
1374 }
1375 MykoMessage::CommandError(err) => {
1376 if err.tx.trim().is_empty() {
1377 log::warn!(
1378 "Malformed client message kind=command_error client={} tx=<empty> command_id={} message={} active_subscriptions={}",
1379 session.client_id,
1380 err.command_id,
1381 err.message,
1382 session.subscription_count()
1383 );
1384 } else {
1385 let correlated = outbound_commands_by_tx
1386 .lock()
1387 .ok()
1388 .and_then(|mut map| map.remove(&err.tx));
1389 if let Some((command_id, started)) = correlated {
1390 log::warn!(
1391 "Client command error matched outbound command client={} tx={} command_id={} transport_command_id={} message={} roundtrip_ms={} active_subscriptions={}",
1392 session.client_id,
1393 err.tx,
1394 err.command_id,
1395 command_id,
1396 err.message,
1397 started.elapsed().as_millis(),
1398 session.subscription_count()
1399 );
1400 } else {
1401 log::warn!(
1402 "Client command error without outbound match client={} tx={} command_id={} message={} active_subscriptions={}",
1403 session.client_id,
1404 err.tx,
1405 err.command_id,
1406 err.message,
1407 session.subscription_count()
1408 );
1409 }
1410 }
1411 }
1412 MykoMessage::Benchmark(payload) => {
1413 let stats = ws_benchmark_stats();
1414 ensure_ws_benchmark_logger();
1415 stats.message_count.fetch_add(1, Ordering::Relaxed);
1416 let size = payload.to_string().len() as u64;
1418 stats.total_bytes.fetch_add(size, Ordering::Relaxed);
1419 }
1420 }
1421
1422 Ok(())
1423 }
1424
1425 fn execute_command_job(
1426 ctx: Arc<CellServerCtx>,
1427 priority_tx: &mpsc::Sender<MykoMessage>,
1428 drop_logger: &DropLogger,
1429 client_id: Arc<str>,
1430 job: CommandJob,
1431 ) {
1432 let host_id = ctx.host_id;
1433 let started = Instant::now();
1434 let queue_wait_ms = started.duration_since(job.received_at).as_millis();
1435 let command_id = job.command_id.clone();
1436
1437 let mut handler_found = false;
1438 for registration in inventory::iter::<CommandHandlerRegistration> {
1439 if registration.command_id == command_id {
1440 handler_found = true;
1441 let executor = (registration.factory)();
1442
1443 let req = Arc::new(RequestContext::from_client(
1444 job.tx_id.clone(),
1445 client_id.clone(),
1446 host_id,
1447 ));
1448 let cmd_id: Arc<str> = Arc::from(command_id.clone());
1449 let cmd_ctx = CommandContext::new(cmd_id, req, ctx.clone());
1450 let execute_started = Instant::now();
1451
1452 match executor.execute_from_value(job.command.clone(), cmd_ctx) {
1453 Ok(result) => {
1454 let response = MykoMessage::CommandResponse(CommandResponse {
1455 response: result,
1456 tx: job.tx_id.to_string(),
1457 });
1458 if let Err(e) = priority_tx.try_send(response) {
1459 drop_logger.on_drop("CommandResponse", &e);
1460 }
1461 }
1462 Err(e) => {
1463 let error = MykoMessage::CommandError(CommandError {
1464 tx: job.tx_id.to_string(),
1465 command_id: command_id.clone(),
1466 message: e.message,
1467 });
1468 if let Err(err) = priority_tx.try_send(error) {
1469 drop_logger.on_drop("CommandError", &err);
1470 }
1471 }
1472 }
1473 let execute_ms = execute_started.elapsed().as_millis();
1474 let total_ms = job.received_at.elapsed().as_millis();
1475 log::trace!(
1476 target: "myko_server::ws_perf",
1477 "command_exec client={} tx={} command_id={} queue_wait_ms={} execute_ms={} total_ms={}",
1478 client_id,
1479 job.tx_id,
1480 command_id,
1481 queue_wait_ms,
1482 execute_ms,
1483 total_ms
1484 );
1485 break;
1486 }
1487 }
1488
1489 if !handler_found {
1490 log::warn!("No registered handler for command: {}", command_id);
1491 let error = MykoMessage::CommandError(CommandError {
1492 tx: job.tx_id.to_string(),
1493 command_id: command_id.clone(),
1494 message: format!("Command handler not found: {}", command_id),
1495 });
1496 if let Err(e) = priority_tx.try_send(error) {
1497 drop_logger.on_drop("CommandError", &e);
1498 }
1499 }
1500
1501 if !handler_found {
1502 log::debug!(
1503 target: "myko_server::ws_perf",
1504 "command_exec client={} tx={} command_id={} queue_wait_ms={} execute_ms=0 total_ms={} handler_found=false",
1505 client_id,
1506 job.tx_id,
1507 command_id,
1508 queue_wait_ms,
1509 job.received_at.elapsed().as_millis()
1510 );
1511 }
1512 }
1513}
1514
1515struct ChannelWriter {
1520 tx: mpsc::Sender<OutboundMessage>,
1521 deferred_tx: mpsc::Sender<DeferredOutbound>,
1522 drop_logger: Arc<DropLogger>,
1523 outgoing_format: Arc<AtomicU8>,
1524}
1525
1526impl ChannelWriter {
1527 #[inline]
1535 fn tx_dead(&self) -> bool {
1536 self.tx.is_closed()
1537 }
1538
1539 #[inline]
1540 fn deferred_dead(&self) -> bool {
1541 self.deferred_tx.is_closed()
1542 }
1543}
1544
1545impl WsWriter for ChannelWriter {
1546 fn send(&self, msg: MykoMessage) {
1547 if self.tx_dead() {
1554 return;
1555 }
1556 if let Err(e) = self.tx.try_send(OutboundMessage::Message(msg)) {
1557 if !matches!(e, mpsc::error::TrySendError::Closed(_)) {
1560 self.drop_logger.on_drop("message", &e);
1561 }
1562 }
1563 }
1564
1565 fn protocol(&self) -> MykoProtocol {
1566 MykoProtocol::from(self.outgoing_format.load(Ordering::SeqCst))
1567 }
1568
1569 fn send_serialized_command(
1570 &self,
1571 tx: Arc<str>,
1572 command_id: String,
1573 payload: EncodedCommandMessage,
1574 ) {
1575 if self.tx_dead() {
1576 return;
1577 }
1578 if let Err(e) = self.tx.try_send(OutboundMessage::SerializedCommand {
1579 tx,
1580 command_id,
1581 payload,
1582 }) && !matches!(e, mpsc::error::TrySendError::Closed(_))
1583 {
1584 self.drop_logger.on_drop("serialized_command", &e);
1585 }
1586 }
1587
1588 fn send_report_response(&self, tx: Arc<str>, output: Arc<dyn AnyOutput>) {
1589 if self.deferred_dead() {
1590 return;
1591 }
1592 if let Err(e) = self
1593 .deferred_tx
1594 .try_send(DeferredOutbound::Report(tx, output))
1595 && !matches!(e, mpsc::error::TrySendError::Closed(_))
1596 {
1597 self.drop_logger.on_drop("ReportResponseDeferred", &e);
1598 }
1599 }
1600
1601 fn send_query_response(&self, response: PendingQueryResponse, is_view: bool) {
1602 if self.deferred_dead() {
1603 return;
1604 }
1605 if let Err(e) = self
1606 .deferred_tx
1607 .try_send(DeferredOutbound::Query { response, is_view })
1608 && !matches!(e, mpsc::error::TrySendError::Closed(_))
1609 {
1610 self.drop_logger.on_drop("QueryResponseDeferred", &e);
1611 }
1612 }
1613}
1614
1615#[cfg(test)]
1616mod tests {
1617 use super::*;
1618
1619 #[test]
1620 fn test_channel_writer() {
1621 let (tx, mut rx) = mpsc::channel(10);
1622 let (deferred_tx, _deferred_rx) = mpsc::channel(10);
1623 let drop_logger = Arc::new(DropLogger::new("test-client".into()));
1624 let writer = ChannelWriter {
1625 tx,
1626 deferred_tx,
1627 drop_logger,
1628 outgoing_format: Arc::new(AtomicU8::new(MykoProtocol::JSON as u8)),
1629 };
1630
1631 let msg = MykoMessage::Ping(PingData {
1632 id: "test".to_string(),
1633 timestamp: 0,
1634 });
1635 writer.send(msg);
1636
1637 let received = rx.try_recv().unwrap();
1638 assert!(matches!(
1639 received,
1640 OutboundMessage::Message(MykoMessage::Ping(_))
1641 ));
1642 }
1643
1644 #[test]
1645 fn outgoing_format_starts_as_json_and_promotes_to_cbor() {
1646 use std::sync::atomic::{AtomicU8, Ordering};
1647
1648 let outgoing_format = AtomicU8::new(MykoProtocol::JSON as u8);
1649
1650 assert_eq!(
1652 MykoProtocol::from(outgoing_format.load(Ordering::SeqCst)),
1653 MykoProtocol::JSON,
1654 );
1655
1656 outgoing_format.store(MykoProtocol::CBOR as u8, Ordering::SeqCst);
1658 assert_eq!(
1659 MykoProtocol::from(outgoing_format.load(Ordering::SeqCst)),
1660 MykoProtocol::CBOR,
1661 );
1662
1663 assert_eq!(
1668 MykoProtocol::from(outgoing_format.load(Ordering::SeqCst)),
1669 MykoProtocol::CBOR,
1670 );
1671 }
1672}