1use std::{
6 collections::HashMap,
7 net::SocketAddr,
8 sync::{
9 Arc, Mutex, OnceLock,
10 atomic::{AtomicBool, AtomicU8, AtomicU64, Ordering},
11 },
12 thread,
13 time::{Duration, Instant},
14};
15
16use futures_util::{SinkExt, StreamExt};
17use hyphae::SelectExt;
18use myko::{
19 WS_MAX_FRAME_SIZE_BYTES, WS_MAX_MESSAGE_SIZE_BYTES,
20 client::MykoProtocol,
21 command::{CommandContext, CommandHandlerRegistration},
22 entities::client::{Client, ClientId},
23 relationship::{
24 iter_client_id_registrations, iter_fallback_to_id_registrations,
25 iter_server_owned_registrations,
26 },
27 report::AnyOutput,
28 request::RequestContext,
29 server::{
30 CellServerCtx, ClientSession, PendingQueryResponse, WsWriter,
31 client_registry::try_client_registry,
32 },
33 wire::{
34 CancelSubscription, CommandError, CommandResponse, EncodedCommandMessage, MEvent,
35 MEventType, MykoMessage, PingData, QueryWindowUpdate, ViewError, ViewWindowUpdate,
36 },
37};
38use tokio::{net::TcpStream, sync::mpsc, time::interval};
39use tokio_tungstenite::{
40 accept_async_with_config,
41 tungstenite::{Message, protocol::WebSocketConfig},
42};
43use uuid::Uuid;
44
45struct WsBenchmarkStats {
46 message_count: AtomicU64,
47 total_bytes: AtomicU64,
48}
49
50static WS_BENCHMARK_STATS: OnceLock<Arc<WsBenchmarkStats>> = OnceLock::new();
51static WS_BENCHMARK_LOGGER_STARTED: AtomicBool = AtomicBool::new(false);
52
53fn ws_benchmark_stats() -> Arc<WsBenchmarkStats> {
54 WS_BENCHMARK_STATS
55 .get_or_init(|| {
56 Arc::new(WsBenchmarkStats {
57 message_count: AtomicU64::new(0),
58 total_bytes: AtomicU64::new(0),
59 })
60 })
61 .clone()
62}
63
64fn ensure_ws_benchmark_logger() {
65 if WS_BENCHMARK_LOGGER_STARTED
66 .compare_exchange(false, true, Ordering::Relaxed, Ordering::Relaxed)
67 .is_err()
68 {
69 return;
70 }
71
72 let stats = ws_benchmark_stats();
73 thread::Builder::new()
74 .name("ws-benchmark-logger".into())
75 .spawn(move || {
76 loop {
77 thread::sleep(Duration::from_secs(1));
78
79 let count = stats.message_count.swap(0, Ordering::Relaxed);
80 let bytes = stats.total_bytes.swap(0, Ordering::Relaxed);
81
82 if count == 0 {
83 continue;
84 }
85
86 log::info!(
87 "WebSocket benchmark last_1s messages={} bytes={} avg_bytes={}",
88 count,
89 bytes,
90 bytes / count
91 );
92 }
93 })
94 .expect("failed to spawn websocket benchmark logger thread");
95}
96
97fn normalize_incoming_event(event: &mut MEvent, client_id: &str, host_id: uuid::Uuid) {
98 if event.change_type != MEventType::SET {
99 return;
100 }
101
102 for reg in iter_client_id_registrations() {
104 if reg.entity_type == event.item_type {
105 if let Some(obj) = event.item.as_object_mut() {
106 obj.insert(
107 reg.field_name_json.to_string(),
108 serde_json::Value::String(client_id.to_string()),
109 );
110 }
111 break;
112 }
113 }
114
115 for reg in iter_server_owned_registrations() {
117 if reg.entity_type == event.item_type {
118 if let Some(obj) = event.item.as_object_mut() {
119 let field = reg.field_name_json;
120 let current = obj.get(field).and_then(|v| v.as_str()).unwrap_or("");
121 if current.is_empty() {
122 obj.insert(
123 field.to_string(),
124 serde_json::Value::String(host_id.to_string()),
125 );
126 }
127 }
128 break;
129 }
130 }
131
132 if let Some(obj) = event.item.as_object_mut()
135 && let Some(id) = obj.get("id").and_then(|v| v.as_str()).map(String::from)
136 {
137 for reg in iter_fallback_to_id_registrations() {
138 if reg.entity_type == event.item_type {
139 let field = reg.field_name_json;
140 if matches!(obj.get(field), None | Some(serde_json::Value::Null)) {
141 obj.insert(field.to_string(), serde_json::Value::String(id.clone()));
142 }
143 }
144 }
145 }
146}
147
148struct DropLogger {
153 client_id: Arc<str>,
154 dropped: std::sync::atomic::AtomicU64,
155 last_log_ms: std::sync::atomic::AtomicU64,
156}
157
158impl DropLogger {
159 fn new(client_id: Arc<str>) -> Self {
160 Self {
161 client_id,
162 dropped: std::sync::atomic::AtomicU64::new(0),
163 last_log_ms: std::sync::atomic::AtomicU64::new(0),
164 }
165 }
166
167 fn on_drop(&self, kind: &'static str, err: &dyn std::fmt::Display) {
168 use std::sync::atomic::Ordering;
169
170 self.dropped.fetch_add(1, Ordering::Relaxed);
171
172 let now_ms = std::time::SystemTime::now()
174 .duration_since(std::time::UNIX_EPOCH)
175 .unwrap_or_default()
176 .as_millis() as u64;
177 let last_ms = self.last_log_ms.load(Ordering::Relaxed);
178 if now_ms.saturating_sub(last_ms) < 1000 {
179 return;
180 }
181
182 if self
183 .last_log_ms
184 .compare_exchange(last_ms, now_ms, Ordering::Relaxed, Ordering::Relaxed)
185 .is_err()
186 {
187 return;
188 }
189
190 let n = self.dropped.swap(0, Ordering::Relaxed);
191 log::warn!(
192 "WebSocket send buffer full; dropped {} message(s) for client {} (latest: {}): {}",
193 n,
194 self.client_id,
195 kind,
196 err
197 );
198 }
199}
200
201struct CommandJob {
202 tx_id: Arc<str>,
203 command_id: String,
204 command: serde_json::Value,
205 received_at: Instant,
206}
207
208enum SubscriptionReady {
210 Query {
211 tx_id: Arc<str>,
212 cellmap: hyphae::CellMap<Arc<str>, Arc<dyn myko::item::AnyItem>, hyphae::CellImmutable>,
213 window: Option<myko::wire::QueryWindow>,
214 },
215 View {
216 tx_id: Arc<str>,
217 view_id: Arc<str>,
218 cellmap: hyphae::CellMap<Arc<str>, Arc<dyn myko::item::AnyItem>, hyphae::CellImmutable>,
219 window: Option<myko::wire::QueryWindow>,
220 },
221}
222
223enum OutboundMessage {
224 Message(MykoMessage),
225 SerializedCommand {
226 tx: Arc<str>,
227 command_id: String,
228 payload: EncodedCommandMessage,
229 },
230}
231
232enum DeferredOutbound {
233 Report(Arc<str>, Arc<dyn AnyOutput>),
234 Query {
235 response: PendingQueryResponse,
236 is_view: bool,
237 },
238}
239
240pub struct WsHandler;
242
243impl WsHandler {
244 pub async fn handle_connection(
246 stream: TcpStream,
247 addr: SocketAddr,
248 ctx: Arc<CellServerCtx>,
249 ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
250 let mut ws_config = WebSocketConfig::default();
251 ws_config.max_message_size = Some(WS_MAX_MESSAGE_SIZE_BYTES);
252 ws_config.max_frame_size = Some(WS_MAX_FRAME_SIZE_BYTES);
253 let ws_stream = accept_async_with_config(stream, Some(ws_config)).await?;
254 Self::handle_upgraded(ws_stream, addr, ctx).await
255 }
256
257 #[allow(clippy::too_many_arguments)]
264 pub async fn handle_upgraded(
265 ws_stream: tokio_tungstenite::WebSocketStream<TcpStream>,
266 addr: SocketAddr,
267 ctx: Arc<CellServerCtx>,
268 ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
269 let host_id = ctx.host_id;
270
271 let (mut write, mut read) = ws_stream.split();
272
273 let (tx, mut rx) = mpsc::channel::<OutboundMessage>(10_000);
276 let (deferred_tx, mut deferred_rx) = mpsc::channel::<DeferredOutbound>(10_000);
277 let (priority_tx, mut priority_rx) = mpsc::channel::<MykoMessage>(1_000);
278 let (command_tx, mut command_rx) = mpsc::unbounded_channel::<CommandJob>();
279 let (subscribe_tx, mut subscribe_rx) = mpsc::unbounded_channel::<SubscriptionReady>();
280
281 let outgoing_format = Arc::new(AtomicU8::new(MykoProtocol::JSON as u8));
284
285 let client_id: Arc<str> = Uuid::new_v4().to_string().into();
287 let drop_logger = Arc::new(DropLogger::new(client_id.clone()));
288 let writer = ChannelWriter {
289 tx: tx.clone(),
290 deferred_tx: deferred_tx.clone(),
291 drop_logger: drop_logger.clone(),
292 outgoing_format: outgoing_format.clone(),
293 };
294
295 let writer_arc: Arc<dyn WsWriter> = Arc::new(ChannelWriter {
297 tx: tx.clone(),
298 deferred_tx: deferred_tx.clone(),
299 drop_logger: drop_logger.clone(),
300 outgoing_format: outgoing_format.clone(),
301 });
302 if let Some(registry) = try_client_registry() {
303 registry.register(client_id.clone(), writer_arc);
304 }
305
306 let mut session = ClientSession::new(client_id.clone(), writer);
307
308 let outgoing_format_writer = outgoing_format.clone();
309 let query_ids_by_tx: Arc<Mutex<HashMap<Arc<str>, Arc<str>>>> =
310 Arc::new(Mutex::new(HashMap::new()));
311 let view_ids_by_tx: Arc<Mutex<HashMap<Arc<str>, Arc<str>>>> =
312 Arc::new(Mutex::new(HashMap::new()));
313 let subscribe_started_by_tx: Arc<Mutex<HashMap<Arc<str>, Instant>>> =
314 Arc::new(Mutex::new(HashMap::new()));
315 let command_started_by_tx: Arc<Mutex<HashMap<Arc<str>, Instant>>> =
316 Arc::new(Mutex::new(HashMap::new()));
317 let outbound_commands_by_tx: Arc<Mutex<HashMap<String, (String, Instant)>>> =
318 Arc::new(Mutex::new(HashMap::new()));
319
320 let outbound_commands_by_tx_writer = outbound_commands_by_tx.clone();
321
322 let client_entity = Client {
324 id: ClientId(client_id.clone()),
325 server_id: host_id.to_string().into(),
326 address: Some(Arc::from(addr.to_string())),
327 windback: None,
328 };
329 if let Err(e) = ctx.set(&client_entity) {
330 log::error!("Failed to persist client entity: {e}");
331 }
332
333 log::info!("Client connected: {} from {}", client_id, addr);
334
335 let write_ctx = ctx.clone();
336 let write_client_id = client_id.clone();
337 let write_addr = addr;
338 let command_ctx = ctx.clone();
339 let command_priority_tx = priority_tx.clone();
340 let command_drop_logger = drop_logger.clone();
341 let command_client_id = client_id.clone();
342
343 let write_task = tokio::spawn(async move {
345 let _ctx = write_ctx;
346 let mut normal_open = true;
347 let mut priority_open = true;
348 let mut deferred_open = true;
349 while normal_open || priority_open || deferred_open {
350 let msg = tokio::select! {
351 biased;
352 maybe = priority_rx.recv(), if priority_open => {
353 match maybe {
354 Some(msg) => OutboundMessage::Message(msg),
355 None => {
356 priority_open = false;
357 continue;
358 }
359 }
360 }
361 maybe = deferred_rx.recv(), if deferred_open => {
362 match maybe {
363 Some(DeferredOutbound::Report(tx, output)) => {
364 OutboundMessage::Message(MykoMessage::ReportResponse(myko::wire::ReportResponse {
365 response: output.to_value(),
366 tx: tx.to_string(),
367 }))
368 }
369 Some(DeferredOutbound::Query { response, is_view }) => {
370 if is_view {
371 OutboundMessage::Message(MykoMessage::ViewResponse(response.into_wire()))
372 } else {
373 OutboundMessage::Message(MykoMessage::QueryResponse(response.into_wire()))
374 }
375 }
376 None => {
377 deferred_open = false;
378 continue;
379 }
380 }
381 }
382 maybe = rx.recv(), if normal_open => {
383 match maybe {
384 Some(msg) => msg,
385 None => {
386 normal_open = false;
387 continue;
388 }
389 }
390 }
391 };
392 match &msg {
396 OutboundMessage::SerializedCommand { .. } => {
397 crate::ws_timing::record_outbound("Command")
398 }
399 OutboundMessage::Message(m) => {
400 crate::ws_timing::record_outbound(crate::ws_timing::message_kind(m))
401 }
402 };
403 let (kind, tx_id, seq, _upserts, _deletes, _total_count) = match &msg {
404 OutboundMessage::SerializedCommand { tx, .. } => {
405 ("command", Some(tx.clone()), None, None, None, None)
406 }
407 OutboundMessage::Message(msg) => match msg {
408 MykoMessage::ViewResponse(r) => (
409 "view_response",
410 Some(r.tx.clone()),
411 Some(r.sequence),
412 Some(r.upserts.len()),
413 Some(r.deletes.len()),
414 r.total_count,
415 ),
416 MykoMessage::QueryResponse(r) => (
417 "query_response",
418 Some(r.tx.clone()),
419 Some(r.sequence),
420 Some(r.upserts.len()),
421 Some(r.deletes.len()),
422 r.total_count,
423 ),
424 MykoMessage::CommandResponse(r) => (
425 "command_response",
426 Some(Arc::<str>::from(r.tx.clone())),
427 None,
428 None,
429 None,
430 None,
431 ),
432 MykoMessage::CommandError(r) => (
433 "command_error",
434 Some(Arc::<str>::from(r.tx.clone())),
435 None,
436 None,
437 None,
438 None,
439 ),
440 _ => ("other", None, None, None, None, None),
441 },
442 };
443
444 match &msg {
445 OutboundMessage::SerializedCommand { tx, command_id, .. } => {
446 if !tx.trim().is_empty()
447 && let Ok(mut map) = outbound_commands_by_tx_writer.lock()
448 {
449 map.insert(tx.to_string(), (command_id.clone(), Instant::now()));
450 }
451 }
452 OutboundMessage::Message(MykoMessage::Command(wrapped)) => {
453 if let Some(tx_id) = wrapped.command.get("tx").and_then(|v| v.as_str())
454 && !tx_id.trim().is_empty()
455 && let Ok(mut map) = outbound_commands_by_tx_writer.lock()
456 {
457 map.insert(
458 tx_id.to_string(),
459 (wrapped.command_id.clone(), Instant::now()),
460 );
461 }
462 }
463 _ => {}
464 }
465
466 let ws_msg = match &msg {
467 OutboundMessage::SerializedCommand {
468 payload: EncodedCommandMessage::Json(json),
469 ..
470 } => Message::Text(json.clone().into()),
471 OutboundMessage::SerializedCommand {
472 payload: EncodedCommandMessage::Cbor(bytes),
473 ..
474 } => Message::Binary(bytes.clone().into()),
475 OutboundMessage::Message(msg)
476 if outgoing_format_writer.load(Ordering::SeqCst)
477 == MykoProtocol::CBOR as u8 =>
478 {
479 let mut bytes = Vec::new();
480 match ciborium::ser::into_writer(msg, &mut bytes) {
481 Ok(()) => Message::Binary(bytes.into()),
482 Err(e) => {
483 log::error!("Failed to serialize message to CBOR: {}", e);
484 continue;
485 }
486 }
487 }
488 OutboundMessage::Message(msg) => match serde_json::to_string(msg) {
489 Ok(json) => Message::Text(json.into()),
490 Err(e) => {
491 log::error!("Failed to serialize message to JSON: {}", e);
492 continue;
493 }
494 },
495 };
496 let payload_bytes = match &ws_msg {
497 Message::Binary(b) => b.len(),
498 Message::Text(t) => t.len(),
499 _ => 0,
500 };
501
502 if let Err(err) = write.send(ws_msg).await {
503 log::error!(
504 "WebSocket write failed for client {} from {} kind={} tx={:?} seq={:?} payload_bytes={} binary={}: {}",
505 write_client_id,
506 write_addr,
507 kind,
508 tx_id,
509 seq,
510 payload_bytes,
511 outgoing_format_writer.load(Ordering::SeqCst) == MykoProtocol::CBOR as u8,
512 err
513 );
514 break;
515 }
516 }
517 if let Some(registry) = try_client_registry() {
520 registry.unregister(&write_client_id);
521 log::info!(
522 "WebSocket writer unregistered client {} from {} (write task exiting)",
523 write_client_id,
524 write_addr,
525 );
526 }
527 log::warn!(
528 "WebSocket writer task exiting for client {} from {} normal_open={} priority_open={} deferred_open={}",
529 write_client_id,
530 write_addr,
531 normal_open,
532 priority_open,
533 deferred_open
534 );
535 });
536
537 let command_started_cleanup = command_started_by_tx.clone();
540 let command_task = tokio::spawn(async move {
541 while let Some(job) = command_rx.recv().await {
542 let command_ctx = command_ctx.clone();
543 let command_priority_tx = command_priority_tx.clone();
544 let command_drop_logger = command_drop_logger.clone();
545 let command_client_id = command_client_id.clone();
546 let tx_id = job.tx_id.clone();
547 let started_map = command_started_cleanup.clone();
548 match tokio::task::spawn_blocking(move || {
549 Self::execute_command_job(
550 command_ctx,
551 &command_priority_tx,
552 command_drop_logger.as_ref(),
553 command_client_id,
554 job,
555 );
556 })
557 .await
558 {
559 Ok(()) => {}
560 Err(e) => {
561 log::error!("Command worker panicked: {}", e);
562 }
563 }
564 if let Ok(mut map) = started_map.lock() {
566 map.remove(&tx_id);
567 }
568 }
569 });
570
571 let mut outbound_ttl_interval = interval(Duration::from_secs(10));
575 outbound_ttl_interval.tick().await; loop {
577 tokio::select! {
578 Some(ready) = subscribe_rx.recv() => {
580 let tx_id = match &ready {
581 SubscriptionReady::Query { tx_id, .. } => tx_id.clone(),
582 SubscriptionReady::View { tx_id, .. } => tx_id.clone(),
583 };
584 if let Ok(mut map) = subscribe_started_by_tx.lock() {
585 map.remove(&tx_id);
586 }
587 match ready {
588 SubscriptionReady::Query { tx_id, cellmap, window } => {
589 session.subscribe_query(tx_id, cellmap, window);
590 }
591 SubscriptionReady::View { tx_id, view_id, cellmap, window } => {
592 session.subscribe_view_with_id(tx_id, view_id, cellmap, window);
593 }
594 }
595 }
596 _ = outbound_ttl_interval.tick() => {
600 if let Ok(mut map) = outbound_commands_by_tx.lock() {
601 let before = map.len();
602 map.retain(|_, (_, started)| started.elapsed() < Duration::from_secs(10));
603 let removed = before - map.len();
604 if removed > 0 {
605 log::debug!(
606 "Outbound command TTL sweep client={}: removed {} stale entries, {} remaining",
607 session.client_id,
608 removed,
609 map.len()
610 );
611 }
612 }
613 }
614 msg = read.next() => {
616 let Some(msg) = msg else { break };
617 let ctx = ctx.clone();
618 let msg = match msg {
619 Ok(m) => m,
620 Err(e) => {
621 log::error!("WebSocket read error from {}: {}", client_id, e);
622 break;
623 }
624 };
625
626 match msg {
627 Message::Binary(data) => {
628 if outgoing_format.load(Ordering::SeqCst) != MykoProtocol::CBOR as u8 {
629 log::debug!(
630 "Client {} promoted outgoing format to CBOR via demonstration",
631 client_id
632 );
633 outgoing_format.store(MykoProtocol::CBOR as u8, Ordering::SeqCst);
634 }
635
636 match ciborium::de::from_reader::<MykoMessage, _>(data.as_ref()) {
637 Ok(myko_msg) => {
638 if let Err(e) = Self::handle_message(
639 &mut session,
640 ctx,
641 &priority_tx,
642 &drop_logger,
643 &query_ids_by_tx,
644 &view_ids_by_tx,
645 &subscribe_started_by_tx,
646 &command_started_by_tx,
647 &outbound_commands_by_tx,
648 &command_tx,
649 &subscribe_tx,
650 myko_msg,
651 ) {
652 log::error!("Error handling message: {}", e);
653 }
654 tokio::task::yield_now().await;
655 }
656 Err(e) => {
657 log::warn!("Failed to parse message from {}: {}", client_id, e);
658 }
659 }
660 }
661 Message::Text(text) => {
662 match serde_json::from_str::<MykoMessage>(&text) {
663 Ok(myko_msg) => {
664 if let Err(e) = Self::handle_message(
665 &mut session,
666 ctx,
667 &priority_tx,
668 &drop_logger,
669 &query_ids_by_tx,
670 &view_ids_by_tx,
671 &subscribe_started_by_tx,
672 &command_started_by_tx,
673 &outbound_commands_by_tx,
674 &command_tx,
675 &subscribe_tx,
676 myko_msg,
677 ) {
678 log::error!("Error handling message: {}", e);
679 }
680 tokio::task::yield_now().await;
681 }
682 Err(e) => {
683 log::warn!(
684 "Failed to parse JSON message from {}: {} | raw: {}",
685 client_id,
686 e,
687 if text.len() > 1000 {
688 &text[..1000]
689 } else {
690 &text
691 }
692 );
693 }
694 }
695 }
696 Message::Ping(data) => {
697 log::trace!("Ping from {}", client_id);
698 let _ = data;
699 }
700 Message::Pong(_) => {
701 log::trace!("Pong from {}", client_id);
702 }
703 Message::Close(frame) => {
704 log::warn!("Client {} sent close frame: {:?}", client_id, frame);
705 break;
706 }
707 Message::Frame(_) => {}
708 }
709 }
710 }
711 }
712
713 write_task.abort();
729 command_task.abort();
730 if let Some(registry) = try_client_registry() {
731 registry.unregister(&client_id);
732 }
733
734 let drop_client_id = client_id.clone();
735 tokio::task::spawn_blocking(move || {
736 drop(session); log::trace!(
738 "Client session subscriptions torn down for {}",
739 drop_client_id
740 );
741 });
742
743 if let Err(e) = ctx.del(&client_entity) {
745 log::error!("Failed to delete client entity: {e}");
746 }
747
748 log::info!("Client disconnected: {} from {}", client_id, addr);
749
750 Ok(())
751 }
752
753 #[allow(clippy::too_many_arguments)]
755 fn handle_message<W: WsWriter>(
756 session: &mut ClientSession<W>,
757 ctx: Arc<CellServerCtx>,
758 priority_tx: &mpsc::Sender<MykoMessage>,
759 drop_logger: &Arc<DropLogger>,
760 query_ids_by_tx: &Arc<Mutex<HashMap<Arc<str>, Arc<str>>>>,
761 view_ids_by_tx: &Arc<Mutex<HashMap<Arc<str>, Arc<str>>>>,
762 subscribe_started_by_tx: &Arc<Mutex<HashMap<Arc<str>, Instant>>>,
763 command_started_by_tx: &Arc<Mutex<HashMap<Arc<str>, Instant>>>,
764 outbound_commands_by_tx: &Arc<Mutex<HashMap<String, (String, Instant)>>>,
765 command_tx: &mpsc::UnboundedSender<CommandJob>,
766 subscribe_tx: &mpsc::UnboundedSender<SubscriptionReady>,
767 msg: MykoMessage,
768 ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
769 crate::ws_timing::record_inbound(crate::ws_timing::message_kind(&msg));
772
773 let handler_registry = ctx.handler_registry.clone();
774
775 let registry = ctx.registry.clone();
776
777 let host_id = ctx.host_id;
778
779 match msg {
780 MykoMessage::Query(wrapped) => {
781 let tx_id: Arc<str> = wrapped
783 .query
784 .get("tx")
785 .and_then(|v| v.as_str())
786 .unwrap_or("unknown")
787 .into();
788 let query_id = &wrapped.query_id;
789 let entity_type = &wrapped.query_item_type;
790
791 if session.has_subscription(&tx_id) {
794 log::debug!(
795 "Ignoring duplicate query subscribe client={} tx={} query_id={} item_type={}",
796 session.client_id,
797 tx_id,
798 query_id,
799 entity_type
800 );
801 return Ok(());
802 }
803 if let Ok(mut map) = query_ids_by_tx.lock() {
804 map.insert(tx_id.clone(), query_id.clone());
805 }
806 if let Ok(mut map) = subscribe_started_by_tx.lock() {
807 map.entry(tx_id.clone()).or_insert_with(Instant::now);
808 }
809
810 log::trace!("Query {} for {} (tx: {})", query_id, entity_type, tx_id);
811 log::trace!(
812 "Query subscribe request client={} tx={} query_id={} item_type={} window={} active_subscriptions_before={}",
813 session.client_id,
814 tx_id,
815 query_id,
816 entity_type,
817 wrapped.window.is_some(),
818 session.subscription_count()
819 );
820
821 let request_context = Arc::new(RequestContext::from_client(
822 tx_id.clone(),
823 session.client_id.clone(),
824 host_id,
825 ));
826
827 if let Some(query_data) = handler_registry.get_query(query_id) {
828 let parsed = (query_data.parse)(wrapped.query.clone());
829 match parsed {
830 Ok(any_query) => {
831 let cell_factory = query_data.cell_factory;
834 let registry = registry.clone();
835 let request_context = request_context.clone();
836 let ctx = ctx.clone();
837 let window = wrapped.window.clone();
838 let query_id = query_id.clone();
839 let sub_tx = subscribe_tx.clone();
840 tokio::task::spawn_blocking(move || {
841 match cell_factory(any_query, registry, request_context, Some(ctx))
842 {
843 Ok(filtered_cellmap) => {
844 let _ = sub_tx.send(SubscriptionReady::Query {
845 tx_id,
846 cellmap: filtered_cellmap,
847 window,
848 });
849 }
850 Err(e) => {
851 log::error!(
852 "Failed to create query cell for {}: {}",
853 query_id,
854 e
855 );
856 }
857 }
858 });
859 }
860 Err(e) => {
861 log::error!(
862 "Failed to parse query {}: {} | payload: {}",
863 query_id,
864 e,
865 serde_json::to_string(&wrapped.query).unwrap_or_default()
866 );
867 }
868 }
869 } else {
870 log::warn!(
872 "No registered query handler for {}, falling back to select all",
873 query_id
874 );
875 let store: myko::store::EntityStore =
876 (*registry.get_or_create(entity_type)).clone();
877 let cellmap = hyphae::MapQuery::materialize(store.select(|_| true));
878 session.subscribe_query(tx_id, cellmap, wrapped.window.clone());
879 }
880 }
881
882 MykoMessage::View(wrapped) => {
883 let tx_id: Arc<str> = wrapped
884 .view
885 .get("tx")
886 .and_then(|v| v.as_str())
887 .unwrap_or("unknown")
888 .into();
889 let view_id = &wrapped.view_id;
890 let item_type = &wrapped.view_item_type;
891
892 if session.has_subscription(&tx_id) {
894 log::debug!(
895 "Ignoring duplicate view subscribe client={} tx={} view_id={} item_type={}",
896 session.client_id,
897 tx_id,
898 view_id,
899 item_type
900 );
901 return Ok(());
902 }
903 if let Ok(mut map) = view_ids_by_tx.lock() {
904 map.insert(tx_id.clone(), view_id.clone());
905 }
906 if let Ok(mut map) = subscribe_started_by_tx.lock() {
907 map.entry(tx_id.clone()).or_insert_with(Instant::now);
908 }
909
910 log::trace!("View {} for {} (tx: {})", view_id, item_type, tx_id);
911 log::trace!(
912 "View subscribe request client={} tx={} view_id={} item_type={} window={:?}",
913 session.client_id,
914 tx_id,
915 view_id,
916 item_type,
917 wrapped.window
918 );
919
920 let request_context = Arc::new(RequestContext::from_client(
921 tx_id.clone(),
922 session.client_id.clone(),
923 host_id,
924 ));
925
926 if let Some(view_data) = handler_registry.get_view(view_id) {
927 let parsed = (view_data.parse)(wrapped.view.clone());
928 match parsed {
929 Ok(any_view) => {
930 log::trace!(
931 "View parsed successfully client={} tx={} view_id={}",
932 session.client_id,
933 tx_id,
934 view_id
935 );
936 let cell_factory = view_data.cell_factory;
939 let registry = registry.clone();
940 let ctx = ctx.clone();
941 let window = wrapped.window.clone();
942 let view_id_clone = view_id.clone();
943 let sub_tx = subscribe_tx.clone();
944 let priority_tx = priority_tx.clone();
945 let drop_logger = drop_logger.clone();
946 tokio::task::spawn_blocking(move || {
947 match cell_factory(any_view, registry, request_context, ctx) {
948 Ok(filtered_cellmap) => {
949 log::trace!(
950 "View cell factory succeeded tx={} view_id={}",
951 tx_id,
952 view_id_clone
953 );
954 let _ = sub_tx.send(SubscriptionReady::View {
955 tx_id,
956 view_id: view_id_clone,
957 cellmap: filtered_cellmap,
958 window,
959 });
960 }
961 Err(e) => {
962 log::error!(
963 "Failed to create view cell for {}: {}",
964 view_id_clone,
965 e
966 );
967 if let Err(err) = priority_tx.try_send(
968 MykoMessage::ViewError(ViewError {
969 tx: tx_id.to_string(),
970 view_id: view_id_clone.to_string(),
971 message: e,
972 }),
973 ) {
974 drop_logger.on_drop("ViewError", &err);
975 }
976 }
977 }
978 });
979 }
980 Err(e) => {
981 let message = format!("Failed to parse view {}: {}", view_id, e);
982 log::error!(
983 "{} | payload: {}",
984 message,
985 serde_json::to_string(&wrapped.view).unwrap_or_default()
986 );
987 if let Err(err) =
988 priority_tx.try_send(MykoMessage::ViewError(ViewError {
989 tx: tx_id.to_string(),
990 view_id: view_id.to_string(),
991 message,
992 }))
993 {
994 drop_logger.on_drop("ViewError", &err);
995 }
996 }
997 }
998 } else {
999 let message = format!("No registered handler for view: {}", view_id);
1000 log::warn!("{}", message);
1001 if let Err(err) = priority_tx.try_send(MykoMessage::ViewError(ViewError {
1002 tx: tx_id.to_string(),
1003 view_id: view_id.to_string(),
1004 message,
1005 })) {
1006 drop_logger.on_drop("ViewError", &err);
1007 }
1008 }
1009 }
1010
1011 MykoMessage::QueryCancel(CancelSubscription { tx: tx_id }) => {
1012 log::trace!(
1013 "QueryCancel received: client={} tx={}",
1014 session.client_id,
1015 tx_id
1016 );
1017 let tx_id: Arc<str> = tx_id.into();
1018 if let Ok(mut map) = query_ids_by_tx.lock() {
1019 map.remove(&tx_id);
1020 }
1021 if let Ok(mut map) = subscribe_started_by_tx.lock() {
1022 map.remove(&tx_id);
1023 }
1024 session.cancel(&tx_id);
1025 }
1026
1027 MykoMessage::QueryWindow(QueryWindowUpdate { tx, window }) => {
1028 let tx_id: Arc<str> = tx.into();
1029 log::trace!(
1030 "Query window request client={} tx={} has_window={} active_subscriptions={}",
1031 session.client_id,
1032 tx_id,
1033 window.is_some(),
1034 session.subscription_count()
1035 );
1036 session.update_query_window(&tx_id, window);
1037 }
1038 MykoMessage::ViewCancel(CancelSubscription { tx: tx_id }) => {
1039 log::trace!("View cancel: {}", tx_id);
1040 let tx_id: Arc<str> = tx_id.into();
1041 if let Ok(mut map) = view_ids_by_tx.lock() {
1042 map.remove(&tx_id);
1043 }
1044 if let Ok(mut map) = subscribe_started_by_tx.lock() {
1045 map.remove(&tx_id);
1046 }
1047 session.cancel(&tx_id);
1048 }
1049 MykoMessage::ViewWindow(ViewWindowUpdate { tx, window }) => {
1050 let tx_id: Arc<str> = tx.into();
1051 log::trace!("View window update: {}", tx_id);
1052 session.update_view_window(&tx_id, window);
1053 }
1054
1055 MykoMessage::Report(wrapped) => {
1056 let tx_id: Arc<str> = wrapped
1058 .report
1059 .get("tx")
1060 .and_then(|v| v.as_str())
1061 .unwrap_or("unknown")
1062 .into();
1063 let report_id = &wrapped.report_id;
1064
1065 log::trace!(
1066 "Report subscribe request client={} tx={} report_id={} active_subscriptions_before={}",
1067 session.client_id,
1068 tx_id,
1069 report_id,
1070 session.subscription_count()
1071 );
1072
1073 if let Some(report_data) = handler_registry.get_report(report_id) {
1075 let parsed = (report_data.parse)(wrapped.report.clone());
1077 match parsed {
1078 Ok(any_report) => {
1079 let request_context = Arc::new(RequestContext::from_client(
1080 tx_id.clone(),
1081 session.client_id.clone(),
1082 host_id,
1083 ));
1084
1085 match (report_data.cell_factory)(any_report, request_context, ctx) {
1087 Ok(cell) => {
1088 session.subscribe_report(
1089 tx_id,
1090 report_id.as_str().into(),
1091 cell,
1092 );
1093 }
1094 Err(e) => {
1095 log::error!(
1096 "Failed to create report cell for {}: {}",
1097 report_id,
1098 e
1099 );
1100 }
1101 }
1102 }
1103 Err(e) => {
1104 log::error!(
1105 "Failed to parse report {}: {} | payload: {}",
1106 report_id,
1107 e,
1108 serde_json::to_string(&wrapped.report).unwrap_or_default()
1109 );
1110 }
1111 }
1112 } else {
1113 log::warn!("No registered handler for report: {}", report_id);
1114 }
1115 }
1116
1117 MykoMessage::ReportCancel(CancelSubscription { tx: tx_id }) => {
1118 log::trace!(
1119 "ReportCancel received: client={} tx={} active_subscriptions_before={}",
1120 session.client_id,
1121 tx_id,
1122 session.subscription_count()
1123 );
1124 session.cancel(&tx_id.into());
1125 }
1126
1127 MykoMessage::Event(mut event) => {
1128 event.sanitize_null_bytes();
1129 normalize_incoming_event(&mut event, &session.client_id, host_id);
1130 if let Err(e) = ctx.apply_event(event) {
1131 log::error!(
1132 "Failed to apply event from client {}: {e}",
1133 session.client_id
1134 );
1135 }
1136 }
1137
1138 MykoMessage::EventBatch(mut events) => {
1139 let incoming = events.len();
1140 if incoming >= 64 {
1141 log::trace!(
1142 "Received event batch from client {} size={}",
1143 session.client_id,
1144 incoming
1145 );
1146 }
1147 for event in &mut events {
1148 event.sanitize_null_bytes();
1149 normalize_incoming_event(event, &session.client_id, host_id);
1150 }
1151 match ctx.apply_event_batch(events) {
1152 Ok(applied) => {
1153 log::trace!(
1154 "Applied event batch from client {} size={}",
1155 session.client_id,
1156 applied
1157 );
1158 }
1159 Err(e) => {
1160 log::error!(
1161 "Failed to apply event batch from client {}: {}",
1162 session.client_id,
1163 e
1164 );
1165 }
1166 }
1167 }
1168
1169 MykoMessage::Command(wrapped) => {
1170 let tx_id: Arc<str> = wrapped
1172 .command
1173 .get("tx")
1174 .and_then(|v| v.as_str())
1175 .unwrap_or("unknown")
1176 .into();
1177
1178 let command_id = &wrapped.command_id;
1179
1180 log::trace!("Command {} (tx: {})", command_id, tx_id,);
1181 let received_at = Instant::now();
1182 if let Ok(mut map) = command_started_by_tx.lock() {
1183 map.insert(tx_id.clone(), received_at);
1184 }
1185 if let Err(e) = command_tx.send(CommandJob {
1186 tx_id: tx_id.clone(),
1187 command_id: wrapped.command_id.clone(),
1188 command: wrapped.command.clone(),
1189 received_at,
1190 }) {
1191 log::error!(
1192 "Failed to enqueue command {} for client {} tx={}: {}",
1193 command_id,
1194 session.client_id,
1195 tx_id,
1196 e
1197 );
1198 let error = MykoMessage::CommandError(CommandError {
1199 tx: tx_id.to_string(),
1200 command_id: command_id.to_string(),
1201 message: "Command queue unavailable".to_string(),
1202 });
1203 if let Err(err) = priority_tx.try_send(error) {
1204 drop_logger.on_drop("CommandError", &err);
1205 }
1206 }
1207 }
1208
1209 MykoMessage::Ping(PingData { id, timestamp }) => {
1210 let pong = MykoMessage::Ping(PingData { id, timestamp });
1212 if let Err(e) = priority_tx.try_send(pong) {
1213 drop_logger.on_drop("Ping", &e);
1214 }
1215 }
1216
1217 MykoMessage::QueryResponse(resp) => {
1219 log::warn!(
1220 "Unexpected client message kind=query_response client={} tx={} seq={} upserts={} deletes={} active_subscriptions={}",
1221 session.client_id,
1222 resp.tx,
1223 resp.sequence,
1224 resp.upserts.len(),
1225 resp.deletes.len(),
1226 session.subscription_count()
1227 );
1228 }
1229 MykoMessage::QueryError(err) => {
1230 log::warn!(
1231 "Unexpected client message kind=query_error client={} tx={} query_id={} message={} active_subscriptions={}",
1232 session.client_id,
1233 err.tx,
1234 err.query_id,
1235 err.message,
1236 session.subscription_count()
1237 );
1238 }
1239 MykoMessage::ViewResponse(resp) => {
1240 log::warn!(
1241 "Unexpected client message kind=view_response client={} tx={} seq={} upserts={} deletes={} active_subscriptions={}",
1242 session.client_id,
1243 resp.tx,
1244 resp.sequence,
1245 resp.upserts.len(),
1246 resp.deletes.len(),
1247 session.subscription_count()
1248 );
1249 }
1250 MykoMessage::ViewError(err) => {
1251 log::warn!(
1252 "Unexpected client message kind=view_error client={} tx={} view_id={} message={} active_subscriptions={}",
1253 session.client_id,
1254 err.tx,
1255 err.view_id,
1256 err.message,
1257 session.subscription_count()
1258 );
1259 }
1260 MykoMessage::ReportResponse(resp) => {
1261 log::warn!(
1262 "Unexpected client message kind=report_response client={} tx={} active_subscriptions={}",
1263 session.client_id,
1264 resp.tx,
1265 session.subscription_count()
1266 );
1267 }
1268 MykoMessage::ReportError(err) => {
1269 log::warn!(
1270 "Unexpected client message kind=report_error client={} tx={} report_id={} message={} active_subscriptions={}",
1271 session.client_id,
1272 err.tx,
1273 err.report_id,
1274 err.message,
1275 session.subscription_count()
1276 );
1277 }
1278 MykoMessage::CommandResponse(resp) => {
1279 if resp.tx.trim().is_empty() {
1280 log::warn!(
1281 "Malformed client message kind=command_response client={} tx=<empty> active_subscriptions={}",
1282 session.client_id,
1283 session.subscription_count()
1284 );
1285 } else {
1286 let correlated = outbound_commands_by_tx
1287 .lock()
1288 .ok()
1289 .and_then(|mut map| map.remove(&resp.tx));
1290 if let Some((command_id, started)) = correlated {
1291 log::trace!(
1296 "Client command response matched outbound command client={} tx={} command_id={} roundtrip_ms={} active_subscriptions={}",
1297 session.client_id,
1298 resp.tx,
1299 command_id,
1300 started.elapsed().as_millis(),
1301 session.subscription_count()
1302 );
1303 } else {
1304 log::warn!(
1305 "Client command response without outbound match client={} tx={} active_subscriptions={}",
1306 session.client_id,
1307 resp.tx,
1308 session.subscription_count()
1309 );
1310 }
1311 }
1312 }
1313 MykoMessage::CommandError(err) => {
1314 if err.tx.trim().is_empty() {
1315 log::warn!(
1316 "Malformed client message kind=command_error client={} tx=<empty> command_id={} message={} active_subscriptions={}",
1317 session.client_id,
1318 err.command_id,
1319 err.message,
1320 session.subscription_count()
1321 );
1322 } else {
1323 let correlated = outbound_commands_by_tx
1324 .lock()
1325 .ok()
1326 .and_then(|mut map| map.remove(&err.tx));
1327 if let Some((command_id, started)) = correlated {
1328 log::warn!(
1329 "Client command error matched outbound command client={} tx={} command_id={} transport_command_id={} message={} roundtrip_ms={} active_subscriptions={}",
1330 session.client_id,
1331 err.tx,
1332 err.command_id,
1333 command_id,
1334 err.message,
1335 started.elapsed().as_millis(),
1336 session.subscription_count()
1337 );
1338 } else {
1339 log::warn!(
1340 "Client command error without outbound match client={} tx={} command_id={} message={} active_subscriptions={}",
1341 session.client_id,
1342 err.tx,
1343 err.command_id,
1344 err.message,
1345 session.subscription_count()
1346 );
1347 }
1348 }
1349 }
1350 MykoMessage::Benchmark(payload) => {
1351 let stats = ws_benchmark_stats();
1352 ensure_ws_benchmark_logger();
1353 stats.message_count.fetch_add(1, Ordering::Relaxed);
1354 let size = payload.to_string().len() as u64;
1356 stats.total_bytes.fetch_add(size, Ordering::Relaxed);
1357 }
1358 }
1359
1360 Ok(())
1361 }
1362
1363 fn execute_command_job(
1364 ctx: Arc<CellServerCtx>,
1365 priority_tx: &mpsc::Sender<MykoMessage>,
1366 drop_logger: &DropLogger,
1367 client_id: Arc<str>,
1368 job: CommandJob,
1369 ) {
1370 let host_id = ctx.host_id;
1371 let started = Instant::now();
1372 let queue_wait_ms = started.duration_since(job.received_at).as_millis();
1373 let command_id = job.command_id.clone();
1374
1375 let mut handler_found = false;
1376 for registration in inventory::iter::<CommandHandlerRegistration> {
1377 if registration.command_id == command_id {
1378 handler_found = true;
1379 let executor = (registration.factory)();
1380
1381 let req = Arc::new(RequestContext::from_client(
1382 job.tx_id.clone(),
1383 client_id.clone(),
1384 host_id,
1385 ));
1386 let cmd_id: Arc<str> = Arc::from(command_id.clone());
1387 let cmd_ctx = CommandContext::new(cmd_id, req, ctx.clone());
1388 let execute_started = Instant::now();
1389
1390 match executor.execute_from_value(job.command.clone(), cmd_ctx) {
1391 Ok(result) => {
1392 let response = MykoMessage::CommandResponse(CommandResponse {
1393 response: result,
1394 tx: job.tx_id.to_string(),
1395 });
1396 if let Err(e) = priority_tx.try_send(response) {
1397 drop_logger.on_drop("CommandResponse", &e);
1398 }
1399 }
1400 Err(e) => {
1401 let error = MykoMessage::CommandError(CommandError {
1402 tx: job.tx_id.to_string(),
1403 command_id: command_id.clone(),
1404 message: e.message,
1405 });
1406 if let Err(err) = priority_tx.try_send(error) {
1407 drop_logger.on_drop("CommandError", &err);
1408 }
1409 }
1410 }
1411 let execute_ms = execute_started.elapsed().as_millis();
1412 let total_ms = job.received_at.elapsed().as_millis();
1413 log::trace!(
1414 target: "myko_server::ws_perf",
1415 "command_exec client={} tx={} command_id={} queue_wait_ms={} execute_ms={} total_ms={}",
1416 client_id,
1417 job.tx_id,
1418 command_id,
1419 queue_wait_ms,
1420 execute_ms,
1421 total_ms
1422 );
1423 break;
1424 }
1425 }
1426
1427 if !handler_found {
1428 log::warn!("No registered handler for command: {}", command_id);
1429 let error = MykoMessage::CommandError(CommandError {
1430 tx: job.tx_id.to_string(),
1431 command_id: command_id.clone(),
1432 message: format!("Command handler not found: {}", command_id),
1433 });
1434 if let Err(e) = priority_tx.try_send(error) {
1435 drop_logger.on_drop("CommandError", &e);
1436 }
1437 }
1438
1439 if !handler_found {
1440 log::debug!(
1441 target: "myko_server::ws_perf",
1442 "command_exec client={} tx={} command_id={} queue_wait_ms={} execute_ms=0 total_ms={} handler_found=false",
1443 client_id,
1444 job.tx_id,
1445 command_id,
1446 queue_wait_ms,
1447 job.received_at.elapsed().as_millis()
1448 );
1449 }
1450 }
1451}
1452
1453struct ChannelWriter {
1458 tx: mpsc::Sender<OutboundMessage>,
1459 deferred_tx: mpsc::Sender<DeferredOutbound>,
1460 drop_logger: Arc<DropLogger>,
1461 outgoing_format: Arc<AtomicU8>,
1462}
1463
1464impl ChannelWriter {
1465 #[inline]
1473 fn tx_dead(&self) -> bool {
1474 self.tx.is_closed()
1475 }
1476
1477 #[inline]
1478 fn deferred_dead(&self) -> bool {
1479 self.deferred_tx.is_closed()
1480 }
1481}
1482
1483impl WsWriter for ChannelWriter {
1484 fn send(&self, msg: MykoMessage) {
1485 if self.tx_dead() {
1492 return;
1493 }
1494 if let Err(e) = self.tx.try_send(OutboundMessage::Message(msg)) {
1495 if !matches!(e, mpsc::error::TrySendError::Closed(_)) {
1498 self.drop_logger.on_drop("message", &e);
1499 }
1500 }
1501 }
1502
1503 fn protocol(&self) -> MykoProtocol {
1504 MykoProtocol::from(self.outgoing_format.load(Ordering::SeqCst))
1505 }
1506
1507 fn send_serialized_command(
1508 &self,
1509 tx: Arc<str>,
1510 command_id: String,
1511 payload: EncodedCommandMessage,
1512 ) {
1513 if self.tx_dead() {
1514 return;
1515 }
1516 if let Err(e) = self.tx.try_send(OutboundMessage::SerializedCommand {
1517 tx,
1518 command_id,
1519 payload,
1520 }) && !matches!(e, mpsc::error::TrySendError::Closed(_))
1521 {
1522 self.drop_logger.on_drop("serialized_command", &e);
1523 }
1524 }
1525
1526 fn send_report_response(&self, tx: Arc<str>, output: Arc<dyn AnyOutput>) {
1527 if self.deferred_dead() {
1528 return;
1529 }
1530 if let Err(e) = self
1531 .deferred_tx
1532 .try_send(DeferredOutbound::Report(tx, output))
1533 && !matches!(e, mpsc::error::TrySendError::Closed(_))
1534 {
1535 self.drop_logger.on_drop("ReportResponseDeferred", &e);
1536 }
1537 }
1538
1539 fn send_query_response(&self, response: PendingQueryResponse, is_view: bool) {
1540 if self.deferred_dead() {
1541 return;
1542 }
1543 if let Err(e) = self
1544 .deferred_tx
1545 .try_send(DeferredOutbound::Query { response, is_view })
1546 && !matches!(e, mpsc::error::TrySendError::Closed(_))
1547 {
1548 self.drop_logger.on_drop("QueryResponseDeferred", &e);
1549 }
1550 }
1551}
1552
1553#[cfg(test)]
1554mod tests {
1555 use super::*;
1556
1557 #[test]
1558 fn test_channel_writer() {
1559 let (tx, mut rx) = mpsc::channel(10);
1560 let (deferred_tx, _deferred_rx) = mpsc::channel(10);
1561 let drop_logger = Arc::new(DropLogger::new("test-client".into()));
1562 let writer = ChannelWriter {
1563 tx,
1564 deferred_tx,
1565 drop_logger,
1566 outgoing_format: Arc::new(AtomicU8::new(MykoProtocol::JSON as u8)),
1567 };
1568
1569 let msg = MykoMessage::Ping(PingData {
1570 id: "test".to_string(),
1571 timestamp: 0,
1572 });
1573 writer.send(msg);
1574
1575 let received = rx.try_recv().unwrap();
1576 assert!(matches!(
1577 received,
1578 OutboundMessage::Message(MykoMessage::Ping(_))
1579 ));
1580 }
1581
1582 #[test]
1583 fn outgoing_format_starts_as_json_and_promotes_to_cbor() {
1584 use std::sync::atomic::{AtomicU8, Ordering};
1585
1586 let outgoing_format = AtomicU8::new(MykoProtocol::JSON as u8);
1587
1588 assert_eq!(
1590 MykoProtocol::from(outgoing_format.load(Ordering::SeqCst)),
1591 MykoProtocol::JSON,
1592 );
1593
1594 outgoing_format.store(MykoProtocol::CBOR as u8, Ordering::SeqCst);
1596 assert_eq!(
1597 MykoProtocol::from(outgoing_format.load(Ordering::SeqCst)),
1598 MykoProtocol::CBOR,
1599 );
1600
1601 assert_eq!(
1606 MykoProtocol::from(outgoing_format.load(Ordering::SeqCst)),
1607 MykoProtocol::CBOR,
1608 );
1609 }
1610}