1use std::{
6 collections::HashMap,
7 net::SocketAddr,
8 sync::{
9 Arc, Mutex, OnceLock,
10 atomic::{AtomicBool, AtomicU8, AtomicU64, Ordering},
11 },
12 thread,
13 time::{Duration, Instant},
14};
15
16use dashmap::DashMap;
17use futures_util::{SinkExt, StreamExt};
18use hyphae::SelectExt;
19use myko::{
20 WS_MAX_FRAME_SIZE_BYTES, WS_MAX_MESSAGE_SIZE_BYTES,
21 client::MykoProtocol,
22 command::{CommandContext, CommandHandlerRegistration},
23 entities::client::{Client, ClientId},
24 relationship::{
25 iter_client_id_registrations, iter_fallback_to_id_registrations,
26 iter_server_owned_registrations,
27 },
28 report::AnyOutput,
29 request::RequestContext,
30 server::{
31 CellServerCtx, ClientSession, PendingQueryResponse, WsWriter,
32 client_registry::try_client_registry,
33 },
34 wire::{
35 CancelSubscription, CommandError, CommandResponse, EncodedCommandMessage, MEvent,
36 MEventType, MykoMessage, PingData, QueryWindowUpdate, ViewError, ViewWindowUpdate,
37 },
38};
39use tokio::{net::TcpStream, sync::mpsc, time::interval};
40use tokio_tungstenite::{
41 accept_async_with_config,
42 tungstenite::{Message, protocol::WebSocketConfig},
43};
44use uuid::Uuid;
45
46struct WsIngestStats {
47 counts_by_type: DashMap<Arc<str>, Arc<AtomicU64>>,
48}
49
50struct WsBenchmarkStats {
51 message_count: AtomicU64,
52 total_bytes: AtomicU64,
53}
54
55static WS_INGEST_STATS: OnceLock<Arc<WsIngestStats>> = OnceLock::new();
56static WS_INGEST_LOGGER_STARTED: AtomicBool = AtomicBool::new(false);
57static WS_BENCHMARK_STATS: OnceLock<Arc<WsBenchmarkStats>> = OnceLock::new();
58static WS_BENCHMARK_LOGGER_STARTED: AtomicBool = AtomicBool::new(false);
59
60fn ws_benchmark_stats() -> Arc<WsBenchmarkStats> {
61 WS_BENCHMARK_STATS
62 .get_or_init(|| {
63 Arc::new(WsBenchmarkStats {
64 message_count: AtomicU64::new(0),
65 total_bytes: AtomicU64::new(0),
66 })
67 })
68 .clone()
69}
70
71fn ensure_ws_benchmark_logger() {
72 if WS_BENCHMARK_LOGGER_STARTED
73 .compare_exchange(false, true, Ordering::Relaxed, Ordering::Relaxed)
74 .is_err()
75 {
76 return;
77 }
78
79 let stats = ws_benchmark_stats();
80 thread::Builder::new()
81 .name("ws-benchmark-logger".into())
82 .spawn(move || {
83 loop {
84 thread::sleep(Duration::from_secs(1));
85
86 let count = stats.message_count.swap(0, Ordering::Relaxed);
87 let bytes = stats.total_bytes.swap(0, Ordering::Relaxed);
88
89 if count == 0 {
90 continue;
91 }
92
93 log::info!(
94 "WebSocket benchmark last_1s messages={} bytes={} avg_bytes={}",
95 count,
96 bytes,
97 bytes / count
98 );
99 }
100 })
101 .expect("failed to spawn websocket benchmark logger thread");
102}
103
104fn ws_ingest_stats() -> Arc<WsIngestStats> {
105 WS_INGEST_STATS
106 .get_or_init(|| {
107 Arc::new(WsIngestStats {
108 counts_by_type: DashMap::new(),
109 })
110 })
111 .clone()
112}
113
114fn ensure_ws_ingest_logger() {
115 if WS_INGEST_LOGGER_STARTED
116 .compare_exchange(false, true, Ordering::Relaxed, Ordering::Relaxed)
117 .is_err()
118 {
119 return;
120 }
121
122 let stats = ws_ingest_stats();
123 thread::Builder::new()
124 .name("ws-ingest-logger".into())
125 .spawn(move || {
126 loop {
127 thread::sleep(Duration::from_secs(1));
128
129 let mut per_type = Vec::new();
130 let mut total = 0u64;
131 for entry in &stats.counts_by_type {
132 let count = entry.value().swap(0, Ordering::Relaxed);
133 if count == 0 {
134 continue;
135 }
136 total += count;
137 per_type.push((entry.key().clone(), count));
138 }
139
140 if total == 0 {
141 continue;
142 }
143
144 per_type.sort_by(|a, b| b.1.cmp(&a.1).then_with(|| a.0.cmp(&b.0)));
145 let details = per_type
146 .into_iter()
147 .map(|(entity_type, count)| format!("{entity_type}={count}"))
148 .collect::<Vec<_>>()
149 .join(" ");
150
151 log::info!("WebSocket ingest last_1s total={} {}", total, details);
152 }
153 })
154 .expect("failed to spawn websocket ingest logger thread");
155}
156
157fn record_ws_ingest(events: &[MEvent]) {
158 if events.is_empty() {
159 return;
160 }
161
162 let stats = ws_ingest_stats();
163 for event in events {
164 let counter = stats
165 .counts_by_type
166 .entry(Arc::<str>::from(event.item_type.as_str()))
167 .or_insert_with(|| Arc::new(AtomicU64::new(0)))
168 .clone();
169 counter.fetch_add(1, Ordering::Relaxed);
170 }
171}
172
173fn normalize_incoming_event(event: &mut MEvent, client_id: &str, host_id: uuid::Uuid) {
174 if event.change_type != MEventType::SET {
175 return;
176 }
177
178 for reg in iter_client_id_registrations() {
180 if reg.entity_type == event.item_type {
181 if let Some(obj) = event.item.as_object_mut() {
182 obj.insert(
183 reg.field_name_json.to_string(),
184 serde_json::Value::String(client_id.to_string()),
185 );
186 }
187 break;
188 }
189 }
190
191 for reg in iter_server_owned_registrations() {
193 if reg.entity_type == event.item_type {
194 if let Some(obj) = event.item.as_object_mut() {
195 let field = reg.field_name_json;
196 let current = obj.get(field).and_then(|v| v.as_str()).unwrap_or("");
197 if current.is_empty() {
198 obj.insert(
199 field.to_string(),
200 serde_json::Value::String(host_id.to_string()),
201 );
202 }
203 }
204 break;
205 }
206 }
207
208 if let Some(obj) = event.item.as_object_mut()
211 && let Some(id) = obj.get("id").and_then(|v| v.as_str()).map(String::from)
212 {
213 for reg in iter_fallback_to_id_registrations() {
214 if reg.entity_type == event.item_type {
215 let field = reg.field_name_json;
216 if matches!(obj.get(field), None | Some(serde_json::Value::Null)) {
217 obj.insert(field.to_string(), serde_json::Value::String(id.clone()));
218 }
219 }
220 }
221 }
222}
223
224struct DropLogger {
229 client_id: Arc<str>,
230 dropped: std::sync::atomic::AtomicU64,
231 last_log_ms: std::sync::atomic::AtomicU64,
232}
233
234impl DropLogger {
235 fn new(client_id: Arc<str>) -> Self {
236 Self {
237 client_id,
238 dropped: std::sync::atomic::AtomicU64::new(0),
239 last_log_ms: std::sync::atomic::AtomicU64::new(0),
240 }
241 }
242
243 fn on_drop(&self, kind: &'static str, err: &dyn std::fmt::Display) {
244 use std::sync::atomic::Ordering;
245
246 self.dropped.fetch_add(1, Ordering::Relaxed);
247
248 let now_ms = std::time::SystemTime::now()
250 .duration_since(std::time::UNIX_EPOCH)
251 .unwrap_or_default()
252 .as_millis() as u64;
253 let last_ms = self.last_log_ms.load(Ordering::Relaxed);
254 if now_ms.saturating_sub(last_ms) < 1000 {
255 return;
256 }
257
258 if self
259 .last_log_ms
260 .compare_exchange(last_ms, now_ms, Ordering::Relaxed, Ordering::Relaxed)
261 .is_err()
262 {
263 return;
264 }
265
266 let n = self.dropped.swap(0, Ordering::Relaxed);
267 log::warn!(
268 "WebSocket send buffer full; dropped {} message(s) for client {} (latest: {}): {}",
269 n,
270 self.client_id,
271 kind,
272 err
273 );
274 }
275}
276
277struct CommandJob {
278 tx_id: Arc<str>,
279 command_id: String,
280 command: serde_json::Value,
281 received_at: Instant,
282}
283
284enum SubscriptionReady {
286 Query {
287 tx_id: Arc<str>,
288 cellmap: hyphae::CellMap<Arc<str>, Arc<dyn myko::item::AnyItem>, hyphae::CellImmutable>,
289 window: Option<myko::wire::QueryWindow>,
290 },
291 View {
292 tx_id: Arc<str>,
293 view_id: Arc<str>,
294 cellmap: hyphae::CellMap<Arc<str>, Arc<dyn myko::item::AnyItem>, hyphae::CellImmutable>,
295 window: Option<myko::wire::QueryWindow>,
296 },
297}
298
299enum OutboundMessage {
300 Message(MykoMessage),
301 SerializedCommand {
302 tx: Arc<str>,
303 command_id: String,
304 payload: EncodedCommandMessage,
305 },
306}
307
308enum DeferredOutbound {
309 Report(Arc<str>, Arc<dyn AnyOutput>),
310 Query {
311 response: PendingQueryResponse,
312 is_view: bool,
313 },
314}
315
316pub struct WsHandler;
318
319impl WsHandler {
320 pub async fn handle_connection(
322 stream: TcpStream,
323 addr: SocketAddr,
324 ctx: Arc<CellServerCtx>,
325 ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
326 let mut ws_config = WebSocketConfig::default();
327 ws_config.max_message_size = Some(WS_MAX_MESSAGE_SIZE_BYTES);
328 ws_config.max_frame_size = Some(WS_MAX_FRAME_SIZE_BYTES);
329 let ws_stream = accept_async_with_config(stream, Some(ws_config)).await?;
330 Self::handle_upgraded(ws_stream, addr, ctx).await
331 }
332
333 #[allow(clippy::too_many_arguments)]
340 pub async fn handle_upgraded(
341 ws_stream: tokio_tungstenite::WebSocketStream<TcpStream>,
342 addr: SocketAddr,
343 ctx: Arc<CellServerCtx>,
344 ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
345 ensure_ws_ingest_logger();
346
347 let host_id = ctx.host_id;
348
349 let (mut write, mut read) = ws_stream.split();
350
351 let (tx, mut rx) = mpsc::channel::<OutboundMessage>(10_000);
354 let (deferred_tx, mut deferred_rx) = mpsc::channel::<DeferredOutbound>(10_000);
355 let (priority_tx, mut priority_rx) = mpsc::channel::<MykoMessage>(1_000);
356 let (command_tx, mut command_rx) = mpsc::unbounded_channel::<CommandJob>();
357 let (subscribe_tx, mut subscribe_rx) = mpsc::unbounded_channel::<SubscriptionReady>();
358
359 let outgoing_format = Arc::new(AtomicU8::new(MykoProtocol::JSON as u8));
362
363 let client_id: Arc<str> = Uuid::new_v4().to_string().into();
365 let drop_logger = Arc::new(DropLogger::new(client_id.clone()));
366 let writer = ChannelWriter {
367 tx: tx.clone(),
368 deferred_tx: deferred_tx.clone(),
369 drop_logger: drop_logger.clone(),
370 outgoing_format: outgoing_format.clone(),
371 };
372
373 let writer_arc: Arc<dyn WsWriter> = Arc::new(ChannelWriter {
375 tx: tx.clone(),
376 deferred_tx: deferred_tx.clone(),
377 drop_logger: drop_logger.clone(),
378 outgoing_format: outgoing_format.clone(),
379 });
380 if let Some(registry) = try_client_registry() {
381 registry.register(client_id.clone(), writer_arc);
382 }
383
384 let mut session = ClientSession::new(client_id.clone(), writer);
385
386 let outgoing_format_writer = outgoing_format.clone();
387 let query_ids_by_tx: Arc<Mutex<HashMap<Arc<str>, Arc<str>>>> =
388 Arc::new(Mutex::new(HashMap::new()));
389 let view_ids_by_tx: Arc<Mutex<HashMap<Arc<str>, Arc<str>>>> =
390 Arc::new(Mutex::new(HashMap::new()));
391 let subscribe_started_by_tx: Arc<Mutex<HashMap<Arc<str>, Instant>>> =
392 Arc::new(Mutex::new(HashMap::new()));
393 let command_started_by_tx: Arc<Mutex<HashMap<Arc<str>, Instant>>> =
394 Arc::new(Mutex::new(HashMap::new()));
395 let outbound_commands_by_tx: Arc<Mutex<HashMap<String, (String, Instant)>>> =
396 Arc::new(Mutex::new(HashMap::new()));
397
398 let outbound_commands_by_tx_writer = outbound_commands_by_tx.clone();
399
400 let client_entity = Client {
402 id: ClientId(client_id.clone()),
403 server_id: host_id.to_string().into(),
404 address: Some(Arc::from(addr.to_string())),
405 windback: None,
406 };
407 if let Err(e) = ctx.set(&client_entity) {
408 log::error!("Failed to persist client entity: {e}");
409 }
410
411 log::info!("Client connected: {} from {}", client_id, addr);
412
413 let write_ctx = ctx.clone();
414 let write_client_id = client_id.clone();
415 let write_addr = addr;
416 let command_ctx = ctx.clone();
417 let command_priority_tx = priority_tx.clone();
418 let command_drop_logger = drop_logger.clone();
419 let command_client_id = client_id.clone();
420
421 let write_task = tokio::spawn(async move {
423 let _ctx = write_ctx;
424 let mut normal_open = true;
425 let mut priority_open = true;
426 let mut deferred_open = true;
427 while normal_open || priority_open || deferred_open {
428 let msg = tokio::select! {
429 biased;
430 maybe = priority_rx.recv(), if priority_open => {
431 match maybe {
432 Some(msg) => OutboundMessage::Message(msg),
433 None => {
434 priority_open = false;
435 continue;
436 }
437 }
438 }
439 maybe = deferred_rx.recv(), if deferred_open => {
440 match maybe {
441 Some(DeferredOutbound::Report(tx, output)) => {
442 OutboundMessage::Message(MykoMessage::ReportResponse(myko::wire::ReportResponse {
443 response: output.to_value(),
444 tx: tx.to_string(),
445 }))
446 }
447 Some(DeferredOutbound::Query { response, is_view }) => {
448 if is_view {
449 OutboundMessage::Message(MykoMessage::ViewResponse(response.into_wire()))
450 } else {
451 OutboundMessage::Message(MykoMessage::QueryResponse(response.into_wire()))
452 }
453 }
454 None => {
455 deferred_open = false;
456 continue;
457 }
458 }
459 }
460 maybe = rx.recv(), if normal_open => {
461 match maybe {
462 Some(msg) => msg,
463 None => {
464 normal_open = false;
465 continue;
466 }
467 }
468 }
469 };
470 match &msg {
474 OutboundMessage::SerializedCommand { .. } => {
475 crate::ws_timing::record_outbound("Command")
476 }
477 OutboundMessage::Message(m) => {
478 crate::ws_timing::record_outbound(crate::ws_timing::message_kind(m))
479 }
480 };
481 let (kind, tx_id, seq, _upserts, _deletes, _total_count) = match &msg {
482 OutboundMessage::SerializedCommand { tx, .. } => {
483 ("command", Some(tx.clone()), None, None, None, None)
484 }
485 OutboundMessage::Message(msg) => match msg {
486 MykoMessage::ViewResponse(r) => (
487 "view_response",
488 Some(r.tx.clone()),
489 Some(r.sequence),
490 Some(r.upserts.len()),
491 Some(r.deletes.len()),
492 r.total_count,
493 ),
494 MykoMessage::QueryResponse(r) => (
495 "query_response",
496 Some(r.tx.clone()),
497 Some(r.sequence),
498 Some(r.upserts.len()),
499 Some(r.deletes.len()),
500 r.total_count,
501 ),
502 MykoMessage::CommandResponse(r) => (
503 "command_response",
504 Some(Arc::<str>::from(r.tx.clone())),
505 None,
506 None,
507 None,
508 None,
509 ),
510 MykoMessage::CommandError(r) => (
511 "command_error",
512 Some(Arc::<str>::from(r.tx.clone())),
513 None,
514 None,
515 None,
516 None,
517 ),
518 _ => ("other", None, None, None, None, None),
519 },
520 };
521
522 match &msg {
523 OutboundMessage::SerializedCommand { tx, command_id, .. } => {
524 if !tx.trim().is_empty()
525 && let Ok(mut map) = outbound_commands_by_tx_writer.lock()
526 {
527 map.insert(tx.to_string(), (command_id.clone(), Instant::now()));
528 }
529 }
530 OutboundMessage::Message(MykoMessage::Command(wrapped)) => {
531 if let Some(tx_id) = wrapped.command.get("tx").and_then(|v| v.as_str())
532 && !tx_id.trim().is_empty()
533 && let Ok(mut map) = outbound_commands_by_tx_writer.lock()
534 {
535 map.insert(
536 tx_id.to_string(),
537 (wrapped.command_id.clone(), Instant::now()),
538 );
539 }
540 }
541 _ => {}
542 }
543
544 let ws_msg = match &msg {
545 OutboundMessage::SerializedCommand {
546 payload: EncodedCommandMessage::Json(json),
547 ..
548 } => Message::Text(json.clone().into()),
549 OutboundMessage::SerializedCommand {
550 payload: EncodedCommandMessage::Cbor(bytes),
551 ..
552 } => Message::Binary(bytes.clone().into()),
553 OutboundMessage::Message(msg)
554 if outgoing_format_writer.load(Ordering::SeqCst)
555 == MykoProtocol::CBOR as u8 =>
556 {
557 let mut bytes = Vec::new();
558 match ciborium::ser::into_writer(msg, &mut bytes) {
559 Ok(()) => Message::Binary(bytes.into()),
560 Err(e) => {
561 log::error!("Failed to serialize message to CBOR: {}", e);
562 continue;
563 }
564 }
565 }
566 OutboundMessage::Message(msg) => match serde_json::to_string(msg) {
567 Ok(json) => Message::Text(json.into()),
568 Err(e) => {
569 log::error!("Failed to serialize message to JSON: {}", e);
570 continue;
571 }
572 },
573 };
574 let payload_bytes = match &ws_msg {
575 Message::Binary(b) => b.len(),
576 Message::Text(t) => t.len(),
577 _ => 0,
578 };
579
580 if let Err(err) = write.send(ws_msg).await {
581 log::error!(
582 "WebSocket write failed for client {} from {} kind={} tx={:?} seq={:?} payload_bytes={} binary={}: {}",
583 write_client_id,
584 write_addr,
585 kind,
586 tx_id,
587 seq,
588 payload_bytes,
589 outgoing_format_writer.load(Ordering::SeqCst) == MykoProtocol::CBOR as u8,
590 err
591 );
592 break;
593 }
594 }
595 if let Some(registry) = try_client_registry() {
598 registry.unregister(&write_client_id);
599 log::info!(
600 "WebSocket writer unregistered client {} from {} (write task exiting)",
601 write_client_id,
602 write_addr,
603 );
604 }
605 log::warn!(
606 "WebSocket writer task exiting for client {} from {} normal_open={} priority_open={} deferred_open={}",
607 write_client_id,
608 write_addr,
609 normal_open,
610 priority_open,
611 deferred_open
612 );
613 });
614
615 let command_started_cleanup = command_started_by_tx.clone();
618 let command_task = tokio::spawn(async move {
619 while let Some(job) = command_rx.recv().await {
620 let command_ctx = command_ctx.clone();
621 let command_priority_tx = command_priority_tx.clone();
622 let command_drop_logger = command_drop_logger.clone();
623 let command_client_id = command_client_id.clone();
624 let tx_id = job.tx_id.clone();
625 let started_map = command_started_cleanup.clone();
626 match tokio::task::spawn_blocking(move || {
627 Self::execute_command_job(
628 command_ctx,
629 &command_priority_tx,
630 command_drop_logger.as_ref(),
631 command_client_id,
632 job,
633 );
634 })
635 .await
636 {
637 Ok(()) => {}
638 Err(e) => {
639 log::error!("Command worker panicked: {}", e);
640 }
641 }
642 if let Ok(mut map) = started_map.lock() {
644 map.remove(&tx_id);
645 }
646 }
647 });
648
649 let mut outbound_ttl_interval = interval(Duration::from_secs(10));
653 outbound_ttl_interval.tick().await; loop {
655 tokio::select! {
656 Some(ready) = subscribe_rx.recv() => {
658 let tx_id = match &ready {
659 SubscriptionReady::Query { tx_id, .. } => tx_id.clone(),
660 SubscriptionReady::View { tx_id, .. } => tx_id.clone(),
661 };
662 if let Ok(mut map) = subscribe_started_by_tx.lock() {
663 map.remove(&tx_id);
664 }
665 match ready {
666 SubscriptionReady::Query { tx_id, cellmap, window } => {
667 session.subscribe_query(tx_id, cellmap, window);
668 }
669 SubscriptionReady::View { tx_id, view_id, cellmap, window } => {
670 session.subscribe_view_with_id(tx_id, view_id, cellmap, window);
671 }
672 }
673 }
674 _ = outbound_ttl_interval.tick() => {
678 if let Ok(mut map) = outbound_commands_by_tx.lock() {
679 let before = map.len();
680 map.retain(|_, (_, started)| started.elapsed() < Duration::from_secs(10));
681 let removed = before - map.len();
682 if removed > 0 {
683 log::debug!(
684 "Outbound command TTL sweep client={}: removed {} stale entries, {} remaining",
685 session.client_id,
686 removed,
687 map.len()
688 );
689 }
690 }
691 }
692 msg = read.next() => {
694 let Some(msg) = msg else { break };
695 let ctx = ctx.clone();
696 let msg = match msg {
697 Ok(m) => m,
698 Err(e) => {
699 log::error!("WebSocket read error from {}: {}", client_id, e);
700 break;
701 }
702 };
703
704 match msg {
705 Message::Binary(data) => {
706 if outgoing_format.load(Ordering::SeqCst) != MykoProtocol::CBOR as u8 {
707 log::debug!(
708 "Client {} promoted outgoing format to CBOR via demonstration",
709 client_id
710 );
711 outgoing_format.store(MykoProtocol::CBOR as u8, Ordering::SeqCst);
712 }
713
714 match ciborium::de::from_reader::<MykoMessage, _>(data.as_ref()) {
715 Ok(myko_msg) => {
716 if let Err(e) = Self::handle_message(
717 &mut session,
718 ctx,
719 &priority_tx,
720 &drop_logger,
721 &query_ids_by_tx,
722 &view_ids_by_tx,
723 &subscribe_started_by_tx,
724 &command_started_by_tx,
725 &outbound_commands_by_tx,
726 &command_tx,
727 &subscribe_tx,
728 myko_msg,
729 ) {
730 log::error!("Error handling message: {}", e);
731 }
732 tokio::task::yield_now().await;
733 }
734 Err(e) => {
735 log::warn!("Failed to parse message from {}: {}", client_id, e);
736 }
737 }
738 }
739 Message::Text(text) => {
740 match serde_json::from_str::<MykoMessage>(&text) {
741 Ok(myko_msg) => {
742 if let Err(e) = Self::handle_message(
743 &mut session,
744 ctx,
745 &priority_tx,
746 &drop_logger,
747 &query_ids_by_tx,
748 &view_ids_by_tx,
749 &subscribe_started_by_tx,
750 &command_started_by_tx,
751 &outbound_commands_by_tx,
752 &command_tx,
753 &subscribe_tx,
754 myko_msg,
755 ) {
756 log::error!("Error handling message: {}", e);
757 }
758 tokio::task::yield_now().await;
759 }
760 Err(e) => {
761 log::warn!(
762 "Failed to parse JSON message from {}: {} | raw: {}",
763 client_id,
764 e,
765 if text.len() > 1000 {
766 &text[..1000]
767 } else {
768 &text
769 }
770 );
771 }
772 }
773 }
774 Message::Ping(data) => {
775 log::trace!("Ping from {}", client_id);
776 let _ = data;
777 }
778 Message::Pong(_) => {
779 log::trace!("Pong from {}", client_id);
780 }
781 Message::Close(frame) => {
782 log::warn!("Client {} sent close frame: {:?}", client_id, frame);
783 break;
784 }
785 Message::Frame(_) => {}
786 }
787 }
788 }
789 }
790
791 write_task.abort();
807 command_task.abort();
808 if let Some(registry) = try_client_registry() {
809 registry.unregister(&client_id);
810 }
811
812 let drop_client_id = client_id.clone();
813 tokio::task::spawn_blocking(move || {
814 drop(session); log::trace!(
816 "Client session subscriptions torn down for {}",
817 drop_client_id
818 );
819 });
820
821 if let Err(e) = ctx.del(&client_entity) {
823 log::error!("Failed to delete client entity: {e}");
824 }
825
826 log::info!("Client disconnected: {} from {}", client_id, addr);
827
828 Ok(())
829 }
830
831 #[allow(clippy::too_many_arguments)]
833 fn handle_message<W: WsWriter>(
834 session: &mut ClientSession<W>,
835 ctx: Arc<CellServerCtx>,
836 priority_tx: &mpsc::Sender<MykoMessage>,
837 drop_logger: &Arc<DropLogger>,
838 query_ids_by_tx: &Arc<Mutex<HashMap<Arc<str>, Arc<str>>>>,
839 view_ids_by_tx: &Arc<Mutex<HashMap<Arc<str>, Arc<str>>>>,
840 subscribe_started_by_tx: &Arc<Mutex<HashMap<Arc<str>, Instant>>>,
841 command_started_by_tx: &Arc<Mutex<HashMap<Arc<str>, Instant>>>,
842 outbound_commands_by_tx: &Arc<Mutex<HashMap<String, (String, Instant)>>>,
843 command_tx: &mpsc::UnboundedSender<CommandJob>,
844 subscribe_tx: &mpsc::UnboundedSender<SubscriptionReady>,
845 msg: MykoMessage,
846 ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
847 crate::ws_timing::record_inbound(crate::ws_timing::message_kind(&msg));
850
851 let handler_registry = ctx.handler_registry.clone();
852
853 let registry = ctx.registry.clone();
854
855 let host_id = ctx.host_id;
856
857 match msg {
858 MykoMessage::Query(wrapped) => {
859 let tx_id: Arc<str> = wrapped
861 .query
862 .get("tx")
863 .and_then(|v| v.as_str())
864 .unwrap_or("unknown")
865 .into();
866 let query_id = &wrapped.query_id;
867 let entity_type = &wrapped.query_item_type;
868
869 if session.has_subscription(&tx_id) {
872 log::debug!(
873 "Ignoring duplicate query subscribe client={} tx={} query_id={} item_type={}",
874 session.client_id,
875 tx_id,
876 query_id,
877 entity_type
878 );
879 return Ok(());
880 }
881 if let Ok(mut map) = query_ids_by_tx.lock() {
882 map.insert(tx_id.clone(), query_id.clone());
883 }
884 if let Ok(mut map) = subscribe_started_by_tx.lock() {
885 map.entry(tx_id.clone()).or_insert_with(Instant::now);
886 }
887
888 log::trace!("Query {} for {} (tx: {})", query_id, entity_type, tx_id);
889 log::trace!(
890 "Query subscribe request client={} tx={} query_id={} item_type={} window={} active_subscriptions_before={}",
891 session.client_id,
892 tx_id,
893 query_id,
894 entity_type,
895 wrapped.window.is_some(),
896 session.subscription_count()
897 );
898
899 let request_context = Arc::new(RequestContext::from_client(
900 tx_id.clone(),
901 session.client_id.clone(),
902 host_id,
903 ));
904
905 if let Some(query_data) = handler_registry.get_query(query_id) {
906 let parsed = (query_data.parse)(wrapped.query.clone());
907 match parsed {
908 Ok(any_query) => {
909 let cell_factory = query_data.cell_factory;
912 let registry = registry.clone();
913 let request_context = request_context.clone();
914 let ctx = ctx.clone();
915 let window = wrapped.window.clone();
916 let query_id = query_id.clone();
917 let sub_tx = subscribe_tx.clone();
918 tokio::task::spawn_blocking(move || {
919 match cell_factory(any_query, registry, request_context, Some(ctx))
920 {
921 Ok(filtered_cellmap) => {
922 let _ = sub_tx.send(SubscriptionReady::Query {
923 tx_id,
924 cellmap: filtered_cellmap,
925 window,
926 });
927 }
928 Err(e) => {
929 log::error!(
930 "Failed to create query cell for {}: {}",
931 query_id,
932 e
933 );
934 }
935 }
936 });
937 }
938 Err(e) => {
939 log::error!(
940 "Failed to parse query {}: {} | payload: {}",
941 query_id,
942 e,
943 serde_json::to_string(&wrapped.query).unwrap_or_default()
944 );
945 }
946 }
947 } else {
948 log::warn!(
950 "No registered query handler for {}, falling back to select all",
951 query_id
952 );
953 let store: myko::store::EntityStore =
954 (*registry.get_or_create(entity_type)).clone();
955 let cellmap = hyphae::MapQuery::materialize(store.select(|_| true));
956 session.subscribe_query(tx_id, cellmap, wrapped.window.clone());
957 }
958 }
959
960 MykoMessage::View(wrapped) => {
961 let tx_id: Arc<str> = wrapped
962 .view
963 .get("tx")
964 .and_then(|v| v.as_str())
965 .unwrap_or("unknown")
966 .into();
967 let view_id = &wrapped.view_id;
968 let item_type = &wrapped.view_item_type;
969
970 if session.has_subscription(&tx_id) {
972 log::debug!(
973 "Ignoring duplicate view subscribe client={} tx={} view_id={} item_type={}",
974 session.client_id,
975 tx_id,
976 view_id,
977 item_type
978 );
979 return Ok(());
980 }
981 if let Ok(mut map) = view_ids_by_tx.lock() {
982 map.insert(tx_id.clone(), view_id.clone());
983 }
984 if let Ok(mut map) = subscribe_started_by_tx.lock() {
985 map.entry(tx_id.clone()).or_insert_with(Instant::now);
986 }
987
988 log::trace!("View {} for {} (tx: {})", view_id, item_type, tx_id);
989 log::trace!(
990 "View subscribe request client={} tx={} view_id={} item_type={} window={:?}",
991 session.client_id,
992 tx_id,
993 view_id,
994 item_type,
995 wrapped.window
996 );
997
998 let request_context = Arc::new(RequestContext::from_client(
999 tx_id.clone(),
1000 session.client_id.clone(),
1001 host_id,
1002 ));
1003
1004 if let Some(view_data) = handler_registry.get_view(view_id) {
1005 let parsed = (view_data.parse)(wrapped.view.clone());
1006 match parsed {
1007 Ok(any_view) => {
1008 log::trace!(
1009 "View parsed successfully client={} tx={} view_id={}",
1010 session.client_id,
1011 tx_id,
1012 view_id
1013 );
1014 let cell_factory = view_data.cell_factory;
1017 let registry = registry.clone();
1018 let ctx = ctx.clone();
1019 let window = wrapped.window.clone();
1020 let view_id_clone = view_id.clone();
1021 let sub_tx = subscribe_tx.clone();
1022 let priority_tx = priority_tx.clone();
1023 let drop_logger = drop_logger.clone();
1024 tokio::task::spawn_blocking(move || {
1025 match cell_factory(any_view, registry, request_context, ctx) {
1026 Ok(filtered_cellmap) => {
1027 log::trace!(
1028 "View cell factory succeeded tx={} view_id={}",
1029 tx_id,
1030 view_id_clone
1031 );
1032 let _ = sub_tx.send(SubscriptionReady::View {
1033 tx_id,
1034 view_id: view_id_clone,
1035 cellmap: filtered_cellmap,
1036 window,
1037 });
1038 }
1039 Err(e) => {
1040 log::error!(
1041 "Failed to create view cell for {}: {}",
1042 view_id_clone,
1043 e
1044 );
1045 if let Err(err) = priority_tx.try_send(
1046 MykoMessage::ViewError(ViewError {
1047 tx: tx_id.to_string(),
1048 view_id: view_id_clone.to_string(),
1049 message: e,
1050 }),
1051 ) {
1052 drop_logger.on_drop("ViewError", &err);
1053 }
1054 }
1055 }
1056 });
1057 }
1058 Err(e) => {
1059 let message = format!("Failed to parse view {}: {}", view_id, e);
1060 log::error!(
1061 "{} | payload: {}",
1062 message,
1063 serde_json::to_string(&wrapped.view).unwrap_or_default()
1064 );
1065 if let Err(err) =
1066 priority_tx.try_send(MykoMessage::ViewError(ViewError {
1067 tx: tx_id.to_string(),
1068 view_id: view_id.to_string(),
1069 message,
1070 }))
1071 {
1072 drop_logger.on_drop("ViewError", &err);
1073 }
1074 }
1075 }
1076 } else {
1077 let message = format!("No registered handler for view: {}", view_id);
1078 log::warn!("{}", message);
1079 if let Err(err) = priority_tx.try_send(MykoMessage::ViewError(ViewError {
1080 tx: tx_id.to_string(),
1081 view_id: view_id.to_string(),
1082 message,
1083 })) {
1084 drop_logger.on_drop("ViewError", &err);
1085 }
1086 }
1087 }
1088
1089 MykoMessage::QueryCancel(CancelSubscription { tx: tx_id }) => {
1090 log::trace!(
1091 "QueryCancel received: client={} tx={}",
1092 session.client_id,
1093 tx_id
1094 );
1095 let tx_id: Arc<str> = tx_id.into();
1096 if let Ok(mut map) = query_ids_by_tx.lock() {
1097 map.remove(&tx_id);
1098 }
1099 if let Ok(mut map) = subscribe_started_by_tx.lock() {
1100 map.remove(&tx_id);
1101 }
1102 session.cancel(&tx_id);
1103 }
1104
1105 MykoMessage::QueryWindow(QueryWindowUpdate { tx, window }) => {
1106 let tx_id: Arc<str> = tx.into();
1107 log::trace!(
1108 "Query window request client={} tx={} has_window={} active_subscriptions={}",
1109 session.client_id,
1110 tx_id,
1111 window.is_some(),
1112 session.subscription_count()
1113 );
1114 session.update_query_window(&tx_id, window);
1115 }
1116 MykoMessage::ViewCancel(CancelSubscription { tx: tx_id }) => {
1117 log::trace!("View cancel: {}", tx_id);
1118 let tx_id: Arc<str> = tx_id.into();
1119 if let Ok(mut map) = view_ids_by_tx.lock() {
1120 map.remove(&tx_id);
1121 }
1122 if let Ok(mut map) = subscribe_started_by_tx.lock() {
1123 map.remove(&tx_id);
1124 }
1125 session.cancel(&tx_id);
1126 }
1127 MykoMessage::ViewWindow(ViewWindowUpdate { tx, window }) => {
1128 let tx_id: Arc<str> = tx.into();
1129 log::trace!("View window update: {}", tx_id);
1130 session.update_view_window(&tx_id, window);
1131 }
1132
1133 MykoMessage::Report(wrapped) => {
1134 let tx_id: Arc<str> = wrapped
1136 .report
1137 .get("tx")
1138 .and_then(|v| v.as_str())
1139 .unwrap_or("unknown")
1140 .into();
1141 let report_id = &wrapped.report_id;
1142
1143 log::trace!(
1144 "Report subscribe request client={} tx={} report_id={} active_subscriptions_before={}",
1145 session.client_id,
1146 tx_id,
1147 report_id,
1148 session.subscription_count()
1149 );
1150
1151 if let Some(report_data) = handler_registry.get_report(report_id) {
1153 let parsed = (report_data.parse)(wrapped.report.clone());
1155 match parsed {
1156 Ok(any_report) => {
1157 let request_context = Arc::new(RequestContext::from_client(
1158 tx_id.clone(),
1159 session.client_id.clone(),
1160 host_id,
1161 ));
1162
1163 match (report_data.cell_factory)(any_report, request_context, ctx) {
1165 Ok(cell) => {
1166 session.subscribe_report(
1167 tx_id,
1168 report_id.as_str().into(),
1169 cell,
1170 );
1171 }
1172 Err(e) => {
1173 log::error!(
1174 "Failed to create report cell for {}: {}",
1175 report_id,
1176 e
1177 );
1178 }
1179 }
1180 }
1181 Err(e) => {
1182 log::error!(
1183 "Failed to parse report {}: {} | payload: {}",
1184 report_id,
1185 e,
1186 serde_json::to_string(&wrapped.report).unwrap_or_default()
1187 );
1188 }
1189 }
1190 } else {
1191 log::warn!("No registered handler for report: {}", report_id);
1192 }
1193 }
1194
1195 MykoMessage::ReportCancel(CancelSubscription { tx: tx_id }) => {
1196 log::trace!(
1197 "ReportCancel received: client={} tx={} active_subscriptions_before={}",
1198 session.client_id,
1199 tx_id,
1200 session.subscription_count()
1201 );
1202 session.cancel(&tx_id.into());
1203 }
1204
1205 MykoMessage::Event(mut event) => {
1206 record_ws_ingest(std::slice::from_ref(&event));
1207 event.sanitize_null_bytes();
1208 normalize_incoming_event(&mut event, &session.client_id, host_id);
1209 if let Err(e) = ctx.apply_event(event) {
1210 log::error!(
1211 "Failed to apply event from client {}: {e}",
1212 session.client_id
1213 );
1214 }
1215 }
1216
1217 MykoMessage::EventBatch(mut events) => {
1218 record_ws_ingest(&events);
1219 let incoming = events.len();
1220 if incoming >= 64 {
1221 log::trace!(
1222 "Received event batch from client {} size={}",
1223 session.client_id,
1224 incoming
1225 );
1226 }
1227 for event in &mut events {
1228 event.sanitize_null_bytes();
1229 normalize_incoming_event(event, &session.client_id, host_id);
1230 }
1231 match ctx.apply_event_batch(events) {
1232 Ok(applied) => {
1233 log::trace!(
1234 "Applied event batch from client {} size={}",
1235 session.client_id,
1236 applied
1237 );
1238 }
1239 Err(e) => {
1240 log::error!(
1241 "Failed to apply event batch from client {}: {}",
1242 session.client_id,
1243 e
1244 );
1245 }
1246 }
1247 }
1248
1249 MykoMessage::Command(wrapped) => {
1250 let tx_id: Arc<str> = wrapped
1252 .command
1253 .get("tx")
1254 .and_then(|v| v.as_str())
1255 .unwrap_or("unknown")
1256 .into();
1257
1258 let command_id = &wrapped.command_id;
1259
1260 log::trace!("Command {} (tx: {})", command_id, tx_id,);
1261 let received_at = Instant::now();
1262 if let Ok(mut map) = command_started_by_tx.lock() {
1263 map.insert(tx_id.clone(), received_at);
1264 }
1265 if let Err(e) = command_tx.send(CommandJob {
1266 tx_id: tx_id.clone(),
1267 command_id: wrapped.command_id.clone(),
1268 command: wrapped.command.clone(),
1269 received_at,
1270 }) {
1271 log::error!(
1272 "Failed to enqueue command {} for client {} tx={}: {}",
1273 command_id,
1274 session.client_id,
1275 tx_id,
1276 e
1277 );
1278 let error = MykoMessage::CommandError(CommandError {
1279 tx: tx_id.to_string(),
1280 command_id: command_id.to_string(),
1281 message: "Command queue unavailable".to_string(),
1282 });
1283 if let Err(err) = priority_tx.try_send(error) {
1284 drop_logger.on_drop("CommandError", &err);
1285 }
1286 }
1287 }
1288
1289 MykoMessage::Ping(PingData { id, timestamp }) => {
1290 let pong = MykoMessage::Ping(PingData { id, timestamp });
1292 if let Err(e) = priority_tx.try_send(pong) {
1293 drop_logger.on_drop("Ping", &e);
1294 }
1295 }
1296
1297 MykoMessage::QueryResponse(resp) => {
1299 log::warn!(
1300 "Unexpected client message kind=query_response client={} tx={} seq={} upserts={} deletes={} active_subscriptions={}",
1301 session.client_id,
1302 resp.tx,
1303 resp.sequence,
1304 resp.upserts.len(),
1305 resp.deletes.len(),
1306 session.subscription_count()
1307 );
1308 }
1309 MykoMessage::QueryError(err) => {
1310 log::warn!(
1311 "Unexpected client message kind=query_error client={} tx={} query_id={} message={} active_subscriptions={}",
1312 session.client_id,
1313 err.tx,
1314 err.query_id,
1315 err.message,
1316 session.subscription_count()
1317 );
1318 }
1319 MykoMessage::ViewResponse(resp) => {
1320 log::warn!(
1321 "Unexpected client message kind=view_response client={} tx={} seq={} upserts={} deletes={} active_subscriptions={}",
1322 session.client_id,
1323 resp.tx,
1324 resp.sequence,
1325 resp.upserts.len(),
1326 resp.deletes.len(),
1327 session.subscription_count()
1328 );
1329 }
1330 MykoMessage::ViewError(err) => {
1331 log::warn!(
1332 "Unexpected client message kind=view_error client={} tx={} view_id={} message={} active_subscriptions={}",
1333 session.client_id,
1334 err.tx,
1335 err.view_id,
1336 err.message,
1337 session.subscription_count()
1338 );
1339 }
1340 MykoMessage::ReportResponse(resp) => {
1341 log::warn!(
1342 "Unexpected client message kind=report_response client={} tx={} active_subscriptions={}",
1343 session.client_id,
1344 resp.tx,
1345 session.subscription_count()
1346 );
1347 }
1348 MykoMessage::ReportError(err) => {
1349 log::warn!(
1350 "Unexpected client message kind=report_error client={} tx={} report_id={} message={} active_subscriptions={}",
1351 session.client_id,
1352 err.tx,
1353 err.report_id,
1354 err.message,
1355 session.subscription_count()
1356 );
1357 }
1358 MykoMessage::CommandResponse(resp) => {
1359 if resp.tx.trim().is_empty() {
1360 log::warn!(
1361 "Malformed client message kind=command_response client={} tx=<empty> active_subscriptions={}",
1362 session.client_id,
1363 session.subscription_count()
1364 );
1365 } else {
1366 let correlated = outbound_commands_by_tx
1367 .lock()
1368 .ok()
1369 .and_then(|mut map| map.remove(&resp.tx));
1370 if let Some((command_id, started)) = correlated {
1371 log::debug!(
1372 "Client command response matched outbound command client={} tx={} command_id={} roundtrip_ms={} active_subscriptions={}",
1373 session.client_id,
1374 resp.tx,
1375 command_id,
1376 started.elapsed().as_millis(),
1377 session.subscription_count()
1378 );
1379 } else {
1380 log::warn!(
1381 "Client command response without outbound match client={} tx={} active_subscriptions={}",
1382 session.client_id,
1383 resp.tx,
1384 session.subscription_count()
1385 );
1386 }
1387 }
1388 }
1389 MykoMessage::CommandError(err) => {
1390 if err.tx.trim().is_empty() {
1391 log::warn!(
1392 "Malformed client message kind=command_error client={} tx=<empty> command_id={} message={} active_subscriptions={}",
1393 session.client_id,
1394 err.command_id,
1395 err.message,
1396 session.subscription_count()
1397 );
1398 } else {
1399 let correlated = outbound_commands_by_tx
1400 .lock()
1401 .ok()
1402 .and_then(|mut map| map.remove(&err.tx));
1403 if let Some((command_id, started)) = correlated {
1404 log::warn!(
1405 "Client command error matched outbound command client={} tx={} command_id={} transport_command_id={} message={} roundtrip_ms={} active_subscriptions={}",
1406 session.client_id,
1407 err.tx,
1408 err.command_id,
1409 command_id,
1410 err.message,
1411 started.elapsed().as_millis(),
1412 session.subscription_count()
1413 );
1414 } else {
1415 log::warn!(
1416 "Client command error without outbound match client={} tx={} command_id={} message={} active_subscriptions={}",
1417 session.client_id,
1418 err.tx,
1419 err.command_id,
1420 err.message,
1421 session.subscription_count()
1422 );
1423 }
1424 }
1425 }
1426 MykoMessage::Benchmark(payload) => {
1427 let stats = ws_benchmark_stats();
1428 ensure_ws_benchmark_logger();
1429 stats.message_count.fetch_add(1, Ordering::Relaxed);
1430 let size = payload.to_string().len() as u64;
1432 stats.total_bytes.fetch_add(size, Ordering::Relaxed);
1433 }
1434 }
1435
1436 Ok(())
1437 }
1438
1439 fn execute_command_job(
1440 ctx: Arc<CellServerCtx>,
1441 priority_tx: &mpsc::Sender<MykoMessage>,
1442 drop_logger: &DropLogger,
1443 client_id: Arc<str>,
1444 job: CommandJob,
1445 ) {
1446 let host_id = ctx.host_id;
1447 let started = Instant::now();
1448 let queue_wait_ms = started.duration_since(job.received_at).as_millis();
1449 let command_id = job.command_id.clone();
1450
1451 let mut handler_found = false;
1452 for registration in inventory::iter::<CommandHandlerRegistration> {
1453 if registration.command_id == command_id {
1454 handler_found = true;
1455 let executor = (registration.factory)();
1456
1457 let req = Arc::new(RequestContext::from_client(
1458 job.tx_id.clone(),
1459 client_id.clone(),
1460 host_id,
1461 ));
1462 let cmd_id: Arc<str> = Arc::from(command_id.clone());
1463 let cmd_ctx = CommandContext::new(cmd_id, req, ctx.clone());
1464 let execute_started = Instant::now();
1465
1466 match executor.execute_from_value(job.command.clone(), cmd_ctx) {
1467 Ok(result) => {
1468 let response = MykoMessage::CommandResponse(CommandResponse {
1469 response: result,
1470 tx: job.tx_id.to_string(),
1471 });
1472 if let Err(e) = priority_tx.try_send(response) {
1473 drop_logger.on_drop("CommandResponse", &e);
1474 }
1475 }
1476 Err(e) => {
1477 let error = MykoMessage::CommandError(CommandError {
1478 tx: job.tx_id.to_string(),
1479 command_id: command_id.clone(),
1480 message: e.message,
1481 });
1482 if let Err(err) = priority_tx.try_send(error) {
1483 drop_logger.on_drop("CommandError", &err);
1484 }
1485 }
1486 }
1487 let execute_ms = execute_started.elapsed().as_millis();
1488 let total_ms = job.received_at.elapsed().as_millis();
1489 log::trace!(
1490 target: "myko_server::ws_perf",
1491 "command_exec client={} tx={} command_id={} queue_wait_ms={} execute_ms={} total_ms={}",
1492 client_id,
1493 job.tx_id,
1494 command_id,
1495 queue_wait_ms,
1496 execute_ms,
1497 total_ms
1498 );
1499 break;
1500 }
1501 }
1502
1503 if !handler_found {
1504 log::warn!("No registered handler for command: {}", command_id);
1505 let error = MykoMessage::CommandError(CommandError {
1506 tx: job.tx_id.to_string(),
1507 command_id: command_id.clone(),
1508 message: format!("Command handler not found: {}", command_id),
1509 });
1510 if let Err(e) = priority_tx.try_send(error) {
1511 drop_logger.on_drop("CommandError", &e);
1512 }
1513 }
1514
1515 if !handler_found {
1516 log::debug!(
1517 target: "myko_server::ws_perf",
1518 "command_exec client={} tx={} command_id={} queue_wait_ms={} execute_ms=0 total_ms={} handler_found=false",
1519 client_id,
1520 job.tx_id,
1521 command_id,
1522 queue_wait_ms,
1523 job.received_at.elapsed().as_millis()
1524 );
1525 }
1526 }
1527}
1528
1529struct ChannelWriter {
1534 tx: mpsc::Sender<OutboundMessage>,
1535 deferred_tx: mpsc::Sender<DeferredOutbound>,
1536 drop_logger: Arc<DropLogger>,
1537 outgoing_format: Arc<AtomicU8>,
1538}
1539
1540impl ChannelWriter {
1541 #[inline]
1549 fn tx_dead(&self) -> bool {
1550 self.tx.is_closed()
1551 }
1552
1553 #[inline]
1554 fn deferred_dead(&self) -> bool {
1555 self.deferred_tx.is_closed()
1556 }
1557}
1558
1559impl WsWriter for ChannelWriter {
1560 fn send(&self, msg: MykoMessage) {
1561 if self.tx_dead() {
1568 return;
1569 }
1570 if let Err(e) = self.tx.try_send(OutboundMessage::Message(msg)) {
1571 if !matches!(e, mpsc::error::TrySendError::Closed(_)) {
1574 self.drop_logger.on_drop("message", &e);
1575 }
1576 }
1577 }
1578
1579 fn protocol(&self) -> MykoProtocol {
1580 MykoProtocol::from(self.outgoing_format.load(Ordering::SeqCst))
1581 }
1582
1583 fn send_serialized_command(
1584 &self,
1585 tx: Arc<str>,
1586 command_id: String,
1587 payload: EncodedCommandMessage,
1588 ) {
1589 if self.tx_dead() {
1590 return;
1591 }
1592 if let Err(e) = self.tx.try_send(OutboundMessage::SerializedCommand {
1593 tx,
1594 command_id,
1595 payload,
1596 }) && !matches!(e, mpsc::error::TrySendError::Closed(_))
1597 {
1598 self.drop_logger.on_drop("serialized_command", &e);
1599 }
1600 }
1601
1602 fn send_report_response(&self, tx: Arc<str>, output: Arc<dyn AnyOutput>) {
1603 if self.deferred_dead() {
1604 return;
1605 }
1606 if let Err(e) = self
1607 .deferred_tx
1608 .try_send(DeferredOutbound::Report(tx, output))
1609 && !matches!(e, mpsc::error::TrySendError::Closed(_))
1610 {
1611 self.drop_logger.on_drop("ReportResponseDeferred", &e);
1612 }
1613 }
1614
1615 fn send_query_response(&self, response: PendingQueryResponse, is_view: bool) {
1616 if self.deferred_dead() {
1617 return;
1618 }
1619 if let Err(e) = self
1620 .deferred_tx
1621 .try_send(DeferredOutbound::Query { response, is_view })
1622 && !matches!(e, mpsc::error::TrySendError::Closed(_))
1623 {
1624 self.drop_logger.on_drop("QueryResponseDeferred", &e);
1625 }
1626 }
1627}
1628
1629#[cfg(test)]
1630mod tests {
1631 use super::*;
1632
1633 #[test]
1634 fn test_channel_writer() {
1635 let (tx, mut rx) = mpsc::channel(10);
1636 let (deferred_tx, _deferred_rx) = mpsc::channel(10);
1637 let drop_logger = Arc::new(DropLogger::new("test-client".into()));
1638 let writer = ChannelWriter {
1639 tx,
1640 deferred_tx,
1641 drop_logger,
1642 outgoing_format: Arc::new(AtomicU8::new(MykoProtocol::JSON as u8)),
1643 };
1644
1645 let msg = MykoMessage::Ping(PingData {
1646 id: "test".to_string(),
1647 timestamp: 0,
1648 });
1649 writer.send(msg);
1650
1651 let received = rx.try_recv().unwrap();
1652 assert!(matches!(
1653 received,
1654 OutboundMessage::Message(MykoMessage::Ping(_))
1655 ));
1656 }
1657
1658 #[test]
1659 fn outgoing_format_starts_as_json_and_promotes_to_cbor() {
1660 use std::sync::atomic::{AtomicU8, Ordering};
1661
1662 let outgoing_format = AtomicU8::new(MykoProtocol::JSON as u8);
1663
1664 assert_eq!(
1666 MykoProtocol::from(outgoing_format.load(Ordering::SeqCst)),
1667 MykoProtocol::JSON,
1668 );
1669
1670 outgoing_format.store(MykoProtocol::CBOR as u8, Ordering::SeqCst);
1672 assert_eq!(
1673 MykoProtocol::from(outgoing_format.load(Ordering::SeqCst)),
1674 MykoProtocol::CBOR,
1675 );
1676
1677 assert_eq!(
1682 MykoProtocol::from(outgoing_format.load(Ordering::SeqCst)),
1683 MykoProtocol::CBOR,
1684 );
1685 }
1686}