Skip to main content

myko_server/
ws_handler.rs

1//! WebSocket handler for the cell-based server.
2//!
3//! Handles WebSocket connections using ClientSession for subscription management.
4
5use std::{
6    collections::HashMap,
7    net::SocketAddr,
8    sync::{
9        Arc, Mutex, OnceLock,
10        atomic::{AtomicBool, AtomicU8, AtomicU64, Ordering},
11    },
12    thread,
13    time::{Duration, Instant},
14};
15
16use dashmap::DashMap;
17use futures_util::{SinkExt, StreamExt};
18use hyphae::SelectExt;
19use myko::{
20    WS_MAX_FRAME_SIZE_BYTES, WS_MAX_MESSAGE_SIZE_BYTES,
21    client::MykoProtocol,
22    command::{CommandContext, CommandHandlerRegistration},
23    entities::client::{Client, ClientId},
24    relationship::{
25        iter_client_id_registrations, iter_fallback_to_id_registrations,
26        iter_server_owned_registrations,
27    },
28    report::AnyOutput,
29    request::RequestContext,
30    server::{
31        CellServerCtx, ClientSession, PendingQueryResponse, WsWriter,
32        client_registry::try_client_registry,
33    },
34    wire::{
35        CancelSubscription, CommandError, CommandResponse, EncodedCommandMessage, MEvent,
36        MEventType, MykoMessage, PingData, QueryWindowUpdate, ViewError, ViewWindowUpdate,
37    },
38};
39use tokio::{net::TcpStream, sync::mpsc, time::interval};
40use tokio_tungstenite::{
41    accept_async_with_config,
42    tungstenite::{Message, protocol::WebSocketConfig},
43};
44use uuid::Uuid;
45
46struct WsIngestStats {
47    counts_by_type: DashMap<Arc<str>, Arc<AtomicU64>>,
48}
49
50struct WsBenchmarkStats {
51    message_count: AtomicU64,
52    total_bytes: AtomicU64,
53}
54
55static WS_INGEST_STATS: OnceLock<Arc<WsIngestStats>> = OnceLock::new();
56static WS_INGEST_LOGGER_STARTED: AtomicBool = AtomicBool::new(false);
57static WS_BENCHMARK_STATS: OnceLock<Arc<WsBenchmarkStats>> = OnceLock::new();
58static WS_BENCHMARK_LOGGER_STARTED: AtomicBool = AtomicBool::new(false);
59
60fn ws_benchmark_stats() -> Arc<WsBenchmarkStats> {
61    WS_BENCHMARK_STATS
62        .get_or_init(|| {
63            Arc::new(WsBenchmarkStats {
64                message_count: AtomicU64::new(0),
65                total_bytes: AtomicU64::new(0),
66            })
67        })
68        .clone()
69}
70
71fn ensure_ws_benchmark_logger() {
72    if WS_BENCHMARK_LOGGER_STARTED
73        .compare_exchange(false, true, Ordering::Relaxed, Ordering::Relaxed)
74        .is_err()
75    {
76        return;
77    }
78
79    let stats = ws_benchmark_stats();
80    thread::Builder::new()
81        .name("ws-benchmark-logger".into())
82        .spawn(move || {
83            loop {
84                thread::sleep(Duration::from_secs(1));
85
86                let count = stats.message_count.swap(0, Ordering::Relaxed);
87                let bytes = stats.total_bytes.swap(0, Ordering::Relaxed);
88
89                if count == 0 {
90                    continue;
91                }
92
93                log::info!(
94                    "WebSocket benchmark last_1s messages={} bytes={} avg_bytes={}",
95                    count,
96                    bytes,
97                    bytes / count
98                );
99            }
100        })
101        .expect("failed to spawn websocket benchmark logger thread");
102}
103
104fn ws_ingest_stats() -> Arc<WsIngestStats> {
105    WS_INGEST_STATS
106        .get_or_init(|| {
107            Arc::new(WsIngestStats {
108                counts_by_type: DashMap::new(),
109            })
110        })
111        .clone()
112}
113
114fn ensure_ws_ingest_logger() {
115    if WS_INGEST_LOGGER_STARTED
116        .compare_exchange(false, true, Ordering::Relaxed, Ordering::Relaxed)
117        .is_err()
118    {
119        return;
120    }
121
122    let stats = ws_ingest_stats();
123    thread::Builder::new()
124        .name("ws-ingest-logger".into())
125        .spawn(move || {
126            loop {
127                thread::sleep(Duration::from_secs(1));
128
129                let mut per_type = Vec::new();
130                let mut total = 0u64;
131                for entry in &stats.counts_by_type {
132                    let count = entry.value().swap(0, Ordering::Relaxed);
133                    if count == 0 {
134                        continue;
135                    }
136                    total += count;
137                    per_type.push((entry.key().clone(), count));
138                }
139
140                if total == 0 {
141                    continue;
142                }
143
144                per_type.sort_by(|a, b| b.1.cmp(&a.1).then_with(|| a.0.cmp(&b.0)));
145                let details = per_type
146                    .into_iter()
147                    .map(|(entity_type, count)| format!("{entity_type}={count}"))
148                    .collect::<Vec<_>>()
149                    .join(" ");
150
151                log::info!("WebSocket ingest last_1s total={} {}", total, details);
152            }
153        })
154        .expect("failed to spawn websocket ingest logger thread");
155}
156
157fn record_ws_ingest(events: &[MEvent]) {
158    if events.is_empty() {
159        return;
160    }
161
162    let stats = ws_ingest_stats();
163    for event in events {
164        let counter = stats
165            .counts_by_type
166            .entry(Arc::<str>::from(event.item_type.as_str()))
167            .or_insert_with(|| Arc::new(AtomicU64::new(0)))
168            .clone();
169        counter.fetch_add(1, Ordering::Relaxed);
170    }
171}
172
173fn normalize_incoming_event(event: &mut MEvent, client_id: &str, host_id: uuid::Uuid) {
174    if event.change_type != MEventType::SET {
175        return;
176    }
177
178    // Auto-populate #[myko_client_id] fields with the connection's client_id
179    for reg in iter_client_id_registrations() {
180        if reg.entity_type == event.item_type {
181            if let Some(obj) = event.item.as_object_mut() {
182                obj.insert(
183                    reg.field_name_json.to_string(),
184                    serde_json::Value::String(client_id.to_string()),
185                );
186            }
187            break;
188        }
189    }
190
191    // Auto-populate #[server_owned] fields with this server's ID
192    for reg in iter_server_owned_registrations() {
193        if reg.entity_type == event.item_type {
194            if let Some(obj) = event.item.as_object_mut() {
195                let field = reg.field_name_json;
196                let current = obj.get(field).and_then(|v| v.as_str()).unwrap_or("");
197                if current.is_empty() {
198                    obj.insert(
199                        field.to_string(),
200                        serde_json::Value::String(host_id.to_string()),
201                    );
202                }
203            }
204            break;
205        }
206    }
207
208    // Auto-populate #[fallback_to_id] fields with the entity's own id
209    // if the field is null or missing.
210    if let Some(obj) = event.item.as_object_mut()
211        && let Some(id) = obj.get("id").and_then(|v| v.as_str()).map(String::from)
212    {
213        for reg in iter_fallback_to_id_registrations() {
214            if reg.entity_type == event.item_type {
215                let field = reg.field_name_json;
216                if matches!(obj.get(field), None | Some(serde_json::Value::Null)) {
217                    obj.insert(field.to_string(), serde_json::Value::String(id.clone()));
218                }
219            }
220        }
221    }
222}
223
224/// Per-connection drop tracking to avoid log storms when clients fall behind.
225///
226/// When the outbound channel is full, we will drop messages (same as today),
227/// but we must not `warn!` for every drop or we can effectively DoS ourselves.
228struct DropLogger {
229    client_id: Arc<str>,
230    dropped: std::sync::atomic::AtomicU64,
231    last_log_ms: std::sync::atomic::AtomicU64,
232}
233
234impl DropLogger {
235    fn new(client_id: Arc<str>) -> Self {
236        Self {
237            client_id,
238            dropped: std::sync::atomic::AtomicU64::new(0),
239            last_log_ms: std::sync::atomic::AtomicU64::new(0),
240        }
241    }
242
243    fn on_drop(&self, kind: &'static str, err: &dyn std::fmt::Display) {
244        use std::sync::atomic::Ordering;
245
246        self.dropped.fetch_add(1, Ordering::Relaxed);
247
248        // Log at most once per second per connection.
249        let now_ms = std::time::SystemTime::now()
250            .duration_since(std::time::UNIX_EPOCH)
251            .unwrap_or_default()
252            .as_millis() as u64;
253        let last_ms = self.last_log_ms.load(Ordering::Relaxed);
254        if now_ms.saturating_sub(last_ms) < 1000 {
255            return;
256        }
257
258        if self
259            .last_log_ms
260            .compare_exchange(last_ms, now_ms, Ordering::Relaxed, Ordering::Relaxed)
261            .is_err()
262        {
263            return;
264        }
265
266        let n = self.dropped.swap(0, Ordering::Relaxed);
267        log::warn!(
268            "WebSocket send buffer full; dropped {} message(s) for client {} (latest: {}): {}",
269            n,
270            self.client_id,
271            kind,
272            err
273        );
274    }
275}
276
277struct CommandJob {
278    tx_id: Arc<str>,
279    command_id: String,
280    command: serde_json::Value,
281    received_at: Instant,
282}
283
284/// Result of an async subscription build (query or view cell_factory).
285enum SubscriptionReady {
286    Query {
287        tx_id: Arc<str>,
288        cellmap: hyphae::CellMap<Arc<str>, Arc<dyn myko::item::AnyItem>, hyphae::CellImmutable>,
289        window: Option<myko::wire::QueryWindow>,
290    },
291    View {
292        tx_id: Arc<str>,
293        view_id: Arc<str>,
294        cellmap: hyphae::CellMap<Arc<str>, Arc<dyn myko::item::AnyItem>, hyphae::CellImmutable>,
295        window: Option<myko::wire::QueryWindow>,
296    },
297}
298
299enum OutboundMessage {
300    Message(MykoMessage),
301    SerializedCommand {
302        tx: Arc<str>,
303        command_id: String,
304        payload: EncodedCommandMessage,
305    },
306}
307
308enum DeferredOutbound {
309    Report(Arc<str>, Arc<dyn AnyOutput>),
310    Query {
311        response: PendingQueryResponse,
312        is_view: bool,
313    },
314}
315
316/// WebSocket handler for a single client connection.
317pub struct WsHandler;
318
319impl WsHandler {
320    /// Handle a new WebSocket connection (performs the handshake).
321    pub async fn handle_connection(
322        stream: TcpStream,
323        addr: SocketAddr,
324        ctx: Arc<CellServerCtx>,
325    ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
326        let mut ws_config = WebSocketConfig::default();
327        ws_config.max_message_size = Some(WS_MAX_MESSAGE_SIZE_BYTES);
328        ws_config.max_frame_size = Some(WS_MAX_FRAME_SIZE_BYTES);
329        let ws_stream = accept_async_with_config(stream, Some(ws_config)).await?;
330        Self::handle_upgraded(ws_stream, addr, ctx).await
331    }
332
333    /// Handle a WebSocket connection whose HTTP/1.1 handshake has already
334    /// completed and produced a [`tokio_tungstenite::WebSocketStream`].
335    ///
336    /// Used by the front-door router when it pre-parses the HTTP request
337    /// (to dispatch between `/myko` WS and `/myko/mcp` HTTP/WS) and then
338    /// completes the WS handshake itself.
339    #[allow(clippy::too_many_arguments)]
340    pub async fn handle_upgraded(
341        ws_stream: tokio_tungstenite::WebSocketStream<TcpStream>,
342        addr: SocketAddr,
343        ctx: Arc<CellServerCtx>,
344    ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
345        ensure_ws_ingest_logger();
346
347        let host_id = ctx.host_id;
348
349        let (mut write, mut read) = ws_stream.split();
350
351        // Create a bounded channel for sending messages to the client
352        // High limit (10k) since we have good memory availability
353        let (tx, mut rx) = mpsc::channel::<OutboundMessage>(10_000);
354        let (deferred_tx, mut deferred_rx) = mpsc::channel::<DeferredOutbound>(10_000);
355        let (priority_tx, mut priority_rx) = mpsc::channel::<MykoMessage>(1_000);
356        let (command_tx, mut command_rx) = mpsc::unbounded_channel::<CommandJob>();
357        let (subscribe_tx, mut subscribe_rx) = mpsc::unbounded_channel::<SubscriptionReady>();
358
359        // Outgoing format for this session: defaults to JSON, sticky-promotes
360        // to CBOR on the first received binary frame. Never demotes.
361        let outgoing_format = Arc::new(AtomicU8::new(MykoProtocol::JSON as u8));
362
363        // Create client session with channel-based writer
364        let client_id: Arc<str> = Uuid::new_v4().to_string().into();
365        let drop_logger = Arc::new(DropLogger::new(client_id.clone()));
366        let writer = ChannelWriter {
367            tx: tx.clone(),
368            deferred_tx: deferred_tx.clone(),
369            drop_logger: drop_logger.clone(),
370            outgoing_format: outgoing_format.clone(),
371        };
372
373        // Register writer in the global client registry (if initialized)
374        let writer_arc: Arc<dyn WsWriter> = Arc::new(ChannelWriter {
375            tx: tx.clone(),
376            deferred_tx: deferred_tx.clone(),
377            drop_logger: drop_logger.clone(),
378            outgoing_format: outgoing_format.clone(),
379        });
380        if let Some(registry) = try_client_registry() {
381            registry.register(client_id.clone(), writer_arc);
382        }
383
384        let mut session = ClientSession::new(client_id.clone(), writer);
385
386        let outgoing_format_writer = outgoing_format.clone();
387        let query_ids_by_tx: Arc<Mutex<HashMap<Arc<str>, Arc<str>>>> =
388            Arc::new(Mutex::new(HashMap::new()));
389        let view_ids_by_tx: Arc<Mutex<HashMap<Arc<str>, Arc<str>>>> =
390            Arc::new(Mutex::new(HashMap::new()));
391        let subscribe_started_by_tx: Arc<Mutex<HashMap<Arc<str>, Instant>>> =
392            Arc::new(Mutex::new(HashMap::new()));
393        let command_started_by_tx: Arc<Mutex<HashMap<Arc<str>, Instant>>> =
394            Arc::new(Mutex::new(HashMap::new()));
395        let outbound_commands_by_tx: Arc<Mutex<HashMap<String, (String, Instant)>>> =
396            Arc::new(Mutex::new(HashMap::new()));
397
398        let outbound_commands_by_tx_writer = outbound_commands_by_tx.clone();
399
400        // Publish Client entity
401        let client_entity = Client {
402            id: ClientId(client_id.clone()),
403            server_id: host_id.to_string().into(),
404            address: Some(Arc::from(addr.to_string())),
405            windback: None,
406        };
407        if let Err(e) = ctx.set(&client_entity) {
408            log::error!("Failed to persist client entity: {e}");
409        }
410
411        log::info!("Client connected: {} from {}", client_id, addr);
412
413        let write_ctx = ctx.clone();
414        let write_client_id = client_id.clone();
415        let write_addr = addr;
416        let command_ctx = ctx.clone();
417        let command_priority_tx = priority_tx.clone();
418        let command_drop_logger = drop_logger.clone();
419        let command_client_id = client_id.clone();
420
421        // Spawn task to forward messages from channel to WebSocket
422        let write_task = tokio::spawn(async move {
423            let _ctx = write_ctx;
424            let mut normal_open = true;
425            let mut priority_open = true;
426            let mut deferred_open = true;
427            while normal_open || priority_open || deferred_open {
428                let msg = tokio::select! {
429                    biased;
430                    maybe = priority_rx.recv(), if priority_open => {
431                        match maybe {
432                            Some(msg) => OutboundMessage::Message(msg),
433                            None => {
434                                priority_open = false;
435                                continue;
436                            }
437                        }
438                    }
439                    maybe = deferred_rx.recv(), if deferred_open => {
440                        match maybe {
441                            Some(DeferredOutbound::Report(tx, output)) => {
442                                OutboundMessage::Message(MykoMessage::ReportResponse(myko::wire::ReportResponse {
443                                    response: output.to_value(),
444                                    tx: tx.to_string(),
445                                }))
446                            }
447                            Some(DeferredOutbound::Query { response, is_view }) => {
448                                if is_view {
449                                    OutboundMessage::Message(MykoMessage::ViewResponse(response.into_wire()))
450                                } else {
451                                    OutboundMessage::Message(MykoMessage::QueryResponse(response.into_wire()))
452                                }
453                            }
454                            None => {
455                                deferred_open = false;
456                                continue;
457                            }
458                        }
459                    }
460                    maybe = rx.recv(), if normal_open => {
461                        match maybe {
462                            Some(msg) => msg,
463                            None => {
464                                normal_open = false;
465                                continue;
466                            }
467                        }
468                    }
469                };
470                // Per-message timing tap (outbound). Counts by kind into the
471                // global ws_timing instrumentation. Counterpart to the inbound
472                // tap in `handle_message`.
473                match &msg {
474                    OutboundMessage::SerializedCommand { .. } => {
475                        crate::ws_timing::record_outbound("Command")
476                    }
477                    OutboundMessage::Message(m) => {
478                        crate::ws_timing::record_outbound(crate::ws_timing::message_kind(m))
479                    }
480                };
481                let (kind, tx_id, seq, _upserts, _deletes, _total_count) = match &msg {
482                    OutboundMessage::SerializedCommand { tx, .. } => {
483                        ("command", Some(tx.clone()), None, None, None, None)
484                    }
485                    OutboundMessage::Message(msg) => match msg {
486                        MykoMessage::ViewResponse(r) => (
487                            "view_response",
488                            Some(r.tx.clone()),
489                            Some(r.sequence),
490                            Some(r.upserts.len()),
491                            Some(r.deletes.len()),
492                            r.total_count,
493                        ),
494                        MykoMessage::QueryResponse(r) => (
495                            "query_response",
496                            Some(r.tx.clone()),
497                            Some(r.sequence),
498                            Some(r.upserts.len()),
499                            Some(r.deletes.len()),
500                            r.total_count,
501                        ),
502                        MykoMessage::CommandResponse(r) => (
503                            "command_response",
504                            Some(Arc::<str>::from(r.tx.clone())),
505                            None,
506                            None,
507                            None,
508                            None,
509                        ),
510                        MykoMessage::CommandError(r) => (
511                            "command_error",
512                            Some(Arc::<str>::from(r.tx.clone())),
513                            None,
514                            None,
515                            None,
516                            None,
517                        ),
518                        _ => ("other", None, None, None, None, None),
519                    },
520                };
521
522                match &msg {
523                    OutboundMessage::SerializedCommand { tx, command_id, .. } => {
524                        if !tx.trim().is_empty()
525                            && let Ok(mut map) = outbound_commands_by_tx_writer.lock()
526                        {
527                            map.insert(tx.to_string(), (command_id.clone(), Instant::now()));
528                        }
529                    }
530                    OutboundMessage::Message(MykoMessage::Command(wrapped)) => {
531                        if let Some(tx_id) = wrapped.command.get("tx").and_then(|v| v.as_str())
532                            && !tx_id.trim().is_empty()
533                            && let Ok(mut map) = outbound_commands_by_tx_writer.lock()
534                        {
535                            map.insert(
536                                tx_id.to_string(),
537                                (wrapped.command_id.clone(), Instant::now()),
538                            );
539                        }
540                    }
541                    _ => {}
542                }
543
544                let ws_msg = match &msg {
545                    OutboundMessage::SerializedCommand {
546                        payload: EncodedCommandMessage::Json(json),
547                        ..
548                    } => Message::Text(json.clone().into()),
549                    OutboundMessage::SerializedCommand {
550                        payload: EncodedCommandMessage::Cbor(bytes),
551                        ..
552                    } => Message::Binary(bytes.clone().into()),
553                    OutboundMessage::Message(msg)
554                        if outgoing_format_writer.load(Ordering::SeqCst)
555                            == MykoProtocol::CBOR as u8 =>
556                    {
557                        let mut bytes = Vec::new();
558                        match ciborium::ser::into_writer(msg, &mut bytes) {
559                            Ok(()) => Message::Binary(bytes.into()),
560                            Err(e) => {
561                                log::error!("Failed to serialize message to CBOR: {}", e);
562                                continue;
563                            }
564                        }
565                    }
566                    OutboundMessage::Message(msg) => match serde_json::to_string(msg) {
567                        Ok(json) => Message::Text(json.into()),
568                        Err(e) => {
569                            log::error!("Failed to serialize message to JSON: {}", e);
570                            continue;
571                        }
572                    },
573                };
574                let payload_bytes = match &ws_msg {
575                    Message::Binary(b) => b.len(),
576                    Message::Text(t) => t.len(),
577                    _ => 0,
578                };
579
580                if let Err(err) = write.send(ws_msg).await {
581                    log::error!(
582                        "WebSocket write failed for client {} from {} kind={} tx={:?} seq={:?} payload_bytes={} binary={}: {}",
583                        write_client_id,
584                        write_addr,
585                        kind,
586                        tx_id,
587                        seq,
588                        payload_bytes,
589                        outgoing_format_writer.load(Ordering::SeqCst) == MykoProtocol::CBOR as u8,
590                        err
591                    );
592                    break;
593                }
594            }
595            // NOTE(ts): Unregister from client registry immediately so the node
596            // executor stops serializing commands into a dead channel.
597            if let Some(registry) = try_client_registry() {
598                registry.unregister(&write_client_id);
599                log::info!(
600                    "WebSocket writer unregistered client {} from {} (write task exiting)",
601                    write_client_id,
602                    write_addr,
603                );
604            }
605            log::warn!(
606                "WebSocket writer task exiting for client {} from {} normal_open={} priority_open={} deferred_open={}",
607                write_client_id,
608                write_addr,
609                normal_open,
610                priority_open,
611                deferred_open
612            );
613        });
614
615        // Execute commands on a dedicated worker so ping/cancel traffic is never
616        // blocked by long-running command handlers.
617        let command_started_cleanup = command_started_by_tx.clone();
618        let command_task = tokio::spawn(async move {
619            while let Some(job) = command_rx.recv().await {
620                let command_ctx = command_ctx.clone();
621                let command_priority_tx = command_priority_tx.clone();
622                let command_drop_logger = command_drop_logger.clone();
623                let command_client_id = command_client_id.clone();
624                let tx_id = job.tx_id.clone();
625                let started_map = command_started_cleanup.clone();
626                match tokio::task::spawn_blocking(move || {
627                    Self::execute_command_job(
628                        command_ctx,
629                        &command_priority_tx,
630                        command_drop_logger.as_ref(),
631                        command_client_id,
632                        job,
633                    );
634                })
635                .await
636                {
637                    Ok(()) => {}
638                    Err(e) => {
639                        log::error!("Command worker panicked: {}", e);
640                    }
641                }
642                // NOTE(ts): Clean up timing entry after command completes (success or panic).
643                if let Ok(mut map) = started_map.lock() {
644                    map.remove(&tx_id);
645                }
646            }
647        });
648
649        // Process incoming messages and completed subscription builds concurrently.
650        // NOTE(ts): View/query cell_factory calls are spawned on the blocking thread pool
651        // so they don't block command processing or other messages.
652        let mut outbound_ttl_interval = interval(Duration::from_secs(10));
653        outbound_ttl_interval.tick().await; // NOTE(ts): consume the immediate first tick
654        loop {
655            tokio::select! {
656                // Completed subscription builds — register with session
657                Some(ready) = subscribe_rx.recv() => {
658                    let tx_id = match &ready {
659                        SubscriptionReady::Query { tx_id, .. } => tx_id.clone(),
660                        SubscriptionReady::View { tx_id, .. } => tx_id.clone(),
661                    };
662                    if let Ok(mut map) = subscribe_started_by_tx.lock() {
663                        map.remove(&tx_id);
664                    }
665                    match ready {
666                        SubscriptionReady::Query { tx_id, cellmap, window } => {
667                            session.subscribe_query(tx_id, cellmap, window);
668                        }
669                        SubscriptionReady::View { tx_id, view_id, cellmap, window } => {
670                            session.subscribe_view_with_id(tx_id, view_id, cellmap, window);
671                        }
672                    }
673                }
674                // NOTE(ts): Sweep outbound command entries older than 10s.
675                // Responses normally arrive quickly; stale entries are from
676                // dropped connections or commands that will never get a response.
677                _ = outbound_ttl_interval.tick() => {
678                    if let Ok(mut map) = outbound_commands_by_tx.lock() {
679                        let before = map.len();
680                        map.retain(|_, (_, started)| started.elapsed() < Duration::from_secs(10));
681                        let removed = before - map.len();
682                        if removed > 0 {
683                            log::debug!(
684                                "Outbound command TTL sweep client={}: removed {} stale entries, {} remaining",
685                                session.client_id,
686                                removed,
687                                map.len()
688                            );
689                        }
690                    }
691                }
692                // Incoming WebSocket messages
693                msg = read.next() => {
694                    let Some(msg) = msg else { break };
695                    let ctx = ctx.clone();
696                    let msg = match msg {
697                        Ok(m) => m,
698                        Err(e) => {
699                            log::error!("WebSocket read error from {}: {}", client_id, e);
700                            break;
701                        }
702                    };
703
704                    match msg {
705                        Message::Binary(data) => {
706                            if outgoing_format.load(Ordering::SeqCst) != MykoProtocol::CBOR as u8 {
707                                log::debug!(
708                                    "Client {} promoted outgoing format to CBOR via demonstration",
709                                    client_id
710                                );
711                                outgoing_format.store(MykoProtocol::CBOR as u8, Ordering::SeqCst);
712                            }
713
714                            match ciborium::de::from_reader::<MykoMessage, _>(data.as_ref()) {
715                                Ok(myko_msg) => {
716                                    if let Err(e) = Self::handle_message(
717                                        &mut session,
718                                        ctx,
719                                        &priority_tx,
720                                        &drop_logger,
721                                        &query_ids_by_tx,
722                                        &view_ids_by_tx,
723                                        &subscribe_started_by_tx,
724                                        &command_started_by_tx,
725                                        &outbound_commands_by_tx,
726                                        &command_tx,
727                                        &subscribe_tx,
728                                        myko_msg,
729                                    ) {
730                                        log::error!("Error handling message: {}", e);
731                                    }
732                                    tokio::task::yield_now().await;
733                                }
734                                Err(e) => {
735                                    log::warn!("Failed to parse message from {}: {}", client_id, e);
736                                }
737                            }
738                        }
739                        Message::Text(text) => {
740                            match serde_json::from_str::<MykoMessage>(&text) {
741                                Ok(myko_msg) => {
742                                    if let Err(e) = Self::handle_message(
743                                        &mut session,
744                                        ctx,
745                                        &priority_tx,
746                                        &drop_logger,
747                                        &query_ids_by_tx,
748                                        &view_ids_by_tx,
749                                        &subscribe_started_by_tx,
750                                        &command_started_by_tx,
751                                        &outbound_commands_by_tx,
752                                        &command_tx,
753                                        &subscribe_tx,
754                                        myko_msg,
755                                    ) {
756                                        log::error!("Error handling message: {}", e);
757                                    }
758                                    tokio::task::yield_now().await;
759                                }
760                                Err(e) => {
761                                    log::warn!(
762                                        "Failed to parse JSON message from {}: {} | raw: {}",
763                                        client_id,
764                                        e,
765                                        if text.len() > 1000 {
766                                            &text[..1000]
767                                        } else {
768                                            &text
769                                        }
770                                    );
771                                }
772                            }
773                        }
774                        Message::Ping(data) => {
775                            log::trace!("Ping from {}", client_id);
776                            let _ = data;
777                        }
778                        Message::Pong(_) => {
779                            log::trace!("Pong from {}", client_id);
780                        }
781                        Message::Close(frame) => {
782                            log::warn!("Client {} sent close frame: {:?}", client_id, frame);
783                            break;
784                        }
785                        Message::Frame(_) => {}
786                    }
787                }
788            }
789        }
790
791        // Cleanup. Order matters:
792        // 1. Abort the write/command tasks first so their channel receivers
793        //    are dropped — `ChannelWriter` clones held by subscriber
794        //    callbacks then short-circuit via `tx_dead()` / `deferred_dead()`
795        //    instead of doing useless work and emitting log spam while we
796        //    tear down the session.
797        // 2. Unregister the client writer from the global registry so other
798        //    parts of the system stop dispatching commands here.
799        // 3. Move the session drop to a blocking thread. `drop(session)`
800        //    cancels each subscription guard, and hyphae's per-cell
801        //    unsubscribe is O(N) in the cell's subscriber list (Vec clone +
802        //    filter + ArcSwap store), so a session with thousands of
803        //    subscriptions can take significant CPU time. Doing it on the
804        //    async runtime would block other connections' tasks; the
805        //    blocking pool is the right place for this.
806        write_task.abort();
807        command_task.abort();
808        if let Some(registry) = try_client_registry() {
809            registry.unregister(&client_id);
810        }
811
812        let drop_client_id = client_id.clone();
813        tokio::task::spawn_blocking(move || {
814            drop(session); // Drops all subscription guards
815            log::trace!(
816                "Client session subscriptions torn down for {}",
817                drop_client_id
818            );
819        });
820
821        // Delete Client entity
822        if let Err(e) = ctx.del(&client_entity) {
823            log::error!("Failed to delete client entity: {e}");
824        }
825
826        log::info!("Client disconnected: {} from {}", client_id, addr);
827
828        Ok(())
829    }
830
831    /// Handle a parsed MykoMessage.
832    #[allow(clippy::too_many_arguments)]
833    fn handle_message<W: WsWriter>(
834        session: &mut ClientSession<W>,
835        ctx: Arc<CellServerCtx>,
836        priority_tx: &mpsc::Sender<MykoMessage>,
837        drop_logger: &Arc<DropLogger>,
838        query_ids_by_tx: &Arc<Mutex<HashMap<Arc<str>, Arc<str>>>>,
839        view_ids_by_tx: &Arc<Mutex<HashMap<Arc<str>, Arc<str>>>>,
840        subscribe_started_by_tx: &Arc<Mutex<HashMap<Arc<str>, Instant>>>,
841        command_started_by_tx: &Arc<Mutex<HashMap<Arc<str>, Instant>>>,
842        outbound_commands_by_tx: &Arc<Mutex<HashMap<String, (String, Instant)>>>,
843        command_tx: &mpsc::UnboundedSender<CommandJob>,
844        subscribe_tx: &mpsc::UnboundedSender<SubscriptionReady>,
845        msg: MykoMessage,
846    ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
847        // Per-message timing tap. Counts inbound messages by kind into the
848        // global instrumentation. Periodic summarizer logs the deltas.
849        crate::ws_timing::record_inbound(crate::ws_timing::message_kind(&msg));
850
851        let handler_registry = ctx.handler_registry.clone();
852
853        let registry = ctx.registry.clone();
854
855        let host_id = ctx.host_id;
856
857        match msg {
858            MykoMessage::Query(wrapped) => {
859                // Extract tx from the query JSON
860                let tx_id: Arc<str> = wrapped
861                    .query
862                    .get("tx")
863                    .and_then(|v| v.as_str())
864                    .unwrap_or("unknown")
865                    .into();
866                let query_id = &wrapped.query_id;
867                let entity_type = &wrapped.query_item_type;
868
869                // Defensive dedupe: some clients can replay the same subscribe request.
870                // A duplicate for the same tx would reset server-side sequence/window state.
871                if session.has_subscription(&tx_id) {
872                    log::debug!(
873                        "Ignoring duplicate query subscribe client={} tx={} query_id={} item_type={}",
874                        session.client_id,
875                        tx_id,
876                        query_id,
877                        entity_type
878                    );
879                    return Ok(());
880                }
881                if let Ok(mut map) = query_ids_by_tx.lock() {
882                    map.insert(tx_id.clone(), query_id.clone());
883                }
884                if let Ok(mut map) = subscribe_started_by_tx.lock() {
885                    map.entry(tx_id.clone()).or_insert_with(Instant::now);
886                }
887
888                log::trace!("Query {} for {} (tx: {})", query_id, entity_type, tx_id);
889                log::trace!(
890                    "Query subscribe request client={} tx={} query_id={} item_type={} window={} active_subscriptions_before={}",
891                    session.client_id,
892                    tx_id,
893                    query_id,
894                    entity_type,
895                    wrapped.window.is_some(),
896                    session.subscription_count()
897                );
898
899                let request_context = Arc::new(RequestContext::from_client(
900                    tx_id.clone(),
901                    session.client_id.clone(),
902                    host_id,
903                ));
904
905                if let Some(query_data) = handler_registry.get_query(query_id) {
906                    let parsed = (query_data.parse)(wrapped.query.clone());
907                    match parsed {
908                        Ok(any_query) => {
909                            // NOTE(ts): Spawn cell_factory on blocking pool so it doesn't
910                            // block command processing or other messages.
911                            let cell_factory = query_data.cell_factory;
912                            let registry = registry.clone();
913                            let request_context = request_context.clone();
914                            let ctx = ctx.clone();
915                            let window = wrapped.window.clone();
916                            let query_id = query_id.clone();
917                            let sub_tx = subscribe_tx.clone();
918                            tokio::task::spawn_blocking(move || {
919                                match cell_factory(any_query, registry, request_context, Some(ctx))
920                                {
921                                    Ok(filtered_cellmap) => {
922                                        let _ = sub_tx.send(SubscriptionReady::Query {
923                                            tx_id,
924                                            cellmap: filtered_cellmap,
925                                            window,
926                                        });
927                                    }
928                                    Err(e) => {
929                                        log::error!(
930                                            "Failed to create query cell for {}: {}",
931                                            query_id,
932                                            e
933                                        );
934                                    }
935                                }
936                            });
937                        }
938                        Err(e) => {
939                            log::error!(
940                                "Failed to parse query {}: {} | payload: {}",
941                                query_id,
942                                e,
943                                serde_json::to_string(&wrapped.query).unwrap_or_default()
944                            );
945                        }
946                    }
947                } else {
948                    // Fall back to select all for unknown queries
949                    log::warn!(
950                        "No registered query handler for {}, falling back to select all",
951                        query_id
952                    );
953                    let store: myko::store::EntityStore =
954                        (*registry.get_or_create(entity_type)).clone();
955                    let cellmap = hyphae::MapQuery::materialize(store.select(|_| true));
956                    session.subscribe_query(tx_id, cellmap, wrapped.window.clone());
957                }
958            }
959
960            MykoMessage::View(wrapped) => {
961                let tx_id: Arc<str> = wrapped
962                    .view
963                    .get("tx")
964                    .and_then(|v| v.as_str())
965                    .unwrap_or("unknown")
966                    .into();
967                let view_id = &wrapped.view_id;
968                let item_type = &wrapped.view_item_type;
969
970                // Defensive dedupe: ignore repeated subscribe for an already-active tx.
971                if session.has_subscription(&tx_id) {
972                    log::debug!(
973                        "Ignoring duplicate view subscribe client={} tx={} view_id={} item_type={}",
974                        session.client_id,
975                        tx_id,
976                        view_id,
977                        item_type
978                    );
979                    return Ok(());
980                }
981                if let Ok(mut map) = view_ids_by_tx.lock() {
982                    map.insert(tx_id.clone(), view_id.clone());
983                }
984                if let Ok(mut map) = subscribe_started_by_tx.lock() {
985                    map.entry(tx_id.clone()).or_insert_with(Instant::now);
986                }
987
988                log::trace!("View {} for {} (tx: {})", view_id, item_type, tx_id);
989                log::trace!(
990                    "View subscribe request client={} tx={} view_id={} item_type={} window={:?}",
991                    session.client_id,
992                    tx_id,
993                    view_id,
994                    item_type,
995                    wrapped.window
996                );
997
998                let request_context = Arc::new(RequestContext::from_client(
999                    tx_id.clone(),
1000                    session.client_id.clone(),
1001                    host_id,
1002                ));
1003
1004                if let Some(view_data) = handler_registry.get_view(view_id) {
1005                    let parsed = (view_data.parse)(wrapped.view.clone());
1006                    match parsed {
1007                        Ok(any_view) => {
1008                            log::trace!(
1009                                "View parsed successfully client={} tx={} view_id={}",
1010                                session.client_id,
1011                                tx_id,
1012                                view_id
1013                            );
1014                            // NOTE(ts): Spawn cell_factory on blocking pool so it doesn't
1015                            // block command processing or other messages.
1016                            let cell_factory = view_data.cell_factory;
1017                            let registry = registry.clone();
1018                            let ctx = ctx.clone();
1019                            let window = wrapped.window.clone();
1020                            let view_id_clone = view_id.clone();
1021                            let sub_tx = subscribe_tx.clone();
1022                            let priority_tx = priority_tx.clone();
1023                            let drop_logger = drop_logger.clone();
1024                            tokio::task::spawn_blocking(move || {
1025                                match cell_factory(any_view, registry, request_context, ctx) {
1026                                    Ok(filtered_cellmap) => {
1027                                        log::trace!(
1028                                            "View cell factory succeeded tx={} view_id={}",
1029                                            tx_id,
1030                                            view_id_clone
1031                                        );
1032                                        let _ = sub_tx.send(SubscriptionReady::View {
1033                                            tx_id,
1034                                            view_id: view_id_clone,
1035                                            cellmap: filtered_cellmap,
1036                                            window,
1037                                        });
1038                                    }
1039                                    Err(e) => {
1040                                        log::error!(
1041                                            "Failed to create view cell for {}: {}",
1042                                            view_id_clone,
1043                                            e
1044                                        );
1045                                        if let Err(err) = priority_tx.try_send(
1046                                            MykoMessage::ViewError(ViewError {
1047                                                tx: tx_id.to_string(),
1048                                                view_id: view_id_clone.to_string(),
1049                                                message: e,
1050                                            }),
1051                                        ) {
1052                                            drop_logger.on_drop("ViewError", &err);
1053                                        }
1054                                    }
1055                                }
1056                            });
1057                        }
1058                        Err(e) => {
1059                            let message = format!("Failed to parse view {}: {}", view_id, e);
1060                            log::error!(
1061                                "{} | payload: {}",
1062                                message,
1063                                serde_json::to_string(&wrapped.view).unwrap_or_default()
1064                            );
1065                            if let Err(err) =
1066                                priority_tx.try_send(MykoMessage::ViewError(ViewError {
1067                                    tx: tx_id.to_string(),
1068                                    view_id: view_id.to_string(),
1069                                    message,
1070                                }))
1071                            {
1072                                drop_logger.on_drop("ViewError", &err);
1073                            }
1074                        }
1075                    }
1076                } else {
1077                    let message = format!("No registered handler for view: {}", view_id);
1078                    log::warn!("{}", message);
1079                    if let Err(err) = priority_tx.try_send(MykoMessage::ViewError(ViewError {
1080                        tx: tx_id.to_string(),
1081                        view_id: view_id.to_string(),
1082                        message,
1083                    })) {
1084                        drop_logger.on_drop("ViewError", &err);
1085                    }
1086                }
1087            }
1088
1089            MykoMessage::QueryCancel(CancelSubscription { tx: tx_id }) => {
1090                log::trace!(
1091                    "QueryCancel received: client={} tx={}",
1092                    session.client_id,
1093                    tx_id
1094                );
1095                let tx_id: Arc<str> = tx_id.into();
1096                if let Ok(mut map) = query_ids_by_tx.lock() {
1097                    map.remove(&tx_id);
1098                }
1099                if let Ok(mut map) = subscribe_started_by_tx.lock() {
1100                    map.remove(&tx_id);
1101                }
1102                session.cancel(&tx_id);
1103            }
1104
1105            MykoMessage::QueryWindow(QueryWindowUpdate { tx, window }) => {
1106                let tx_id: Arc<str> = tx.into();
1107                log::trace!(
1108                    "Query window request client={} tx={} has_window={} active_subscriptions={}",
1109                    session.client_id,
1110                    tx_id,
1111                    window.is_some(),
1112                    session.subscription_count()
1113                );
1114                session.update_query_window(&tx_id, window);
1115            }
1116            MykoMessage::ViewCancel(CancelSubscription { tx: tx_id }) => {
1117                log::trace!("View cancel: {}", tx_id);
1118                let tx_id: Arc<str> = tx_id.into();
1119                if let Ok(mut map) = view_ids_by_tx.lock() {
1120                    map.remove(&tx_id);
1121                }
1122                if let Ok(mut map) = subscribe_started_by_tx.lock() {
1123                    map.remove(&tx_id);
1124                }
1125                session.cancel(&tx_id);
1126            }
1127            MykoMessage::ViewWindow(ViewWindowUpdate { tx, window }) => {
1128                let tx_id: Arc<str> = tx.into();
1129                log::trace!("View window update: {}", tx_id);
1130                session.update_view_window(&tx_id, window);
1131            }
1132
1133            MykoMessage::Report(wrapped) => {
1134                // Extract tx from the report JSON
1135                let tx_id: Arc<str> = wrapped
1136                    .report
1137                    .get("tx")
1138                    .and_then(|v| v.as_str())
1139                    .unwrap_or("unknown")
1140                    .into();
1141                let report_id = &wrapped.report_id;
1142
1143                log::trace!(
1144                    "Report subscribe request client={} tx={} report_id={} active_subscriptions_before={}",
1145                    session.client_id,
1146                    tx_id,
1147                    report_id,
1148                    session.subscription_count()
1149                );
1150
1151                // Look up the report registration
1152                if let Some(report_data) = handler_registry.get_report(report_id) {
1153                    // Parse the report JSON to the concrete type
1154                    let parsed = (report_data.parse)(wrapped.report.clone());
1155                    match parsed {
1156                        Ok(any_report) => {
1157                            let request_context = Arc::new(RequestContext::from_client(
1158                                tx_id.clone(),
1159                                session.client_id.clone(),
1160                                host_id,
1161                            ));
1162
1163                            // Create the cell using the factory (with host_id for context)
1164                            match (report_data.cell_factory)(any_report, request_context, ctx) {
1165                                Ok(cell) => {
1166                                    session.subscribe_report(
1167                                        tx_id,
1168                                        report_id.as_str().into(),
1169                                        cell,
1170                                    );
1171                                }
1172                                Err(e) => {
1173                                    log::error!(
1174                                        "Failed to create report cell for {}: {}",
1175                                        report_id,
1176                                        e
1177                                    );
1178                                }
1179                            }
1180                        }
1181                        Err(e) => {
1182                            log::error!(
1183                                "Failed to parse report {}: {} | payload: {}",
1184                                report_id,
1185                                e,
1186                                serde_json::to_string(&wrapped.report).unwrap_or_default()
1187                            );
1188                        }
1189                    }
1190                } else {
1191                    log::warn!("No registered handler for report: {}", report_id);
1192                }
1193            }
1194
1195            MykoMessage::ReportCancel(CancelSubscription { tx: tx_id }) => {
1196                log::trace!(
1197                    "ReportCancel received: client={} tx={} active_subscriptions_before={}",
1198                    session.client_id,
1199                    tx_id,
1200                    session.subscription_count()
1201                );
1202                session.cancel(&tx_id.into());
1203            }
1204
1205            MykoMessage::Event(mut event) => {
1206                record_ws_ingest(std::slice::from_ref(&event));
1207                event.sanitize_null_bytes();
1208                normalize_incoming_event(&mut event, &session.client_id, host_id);
1209                if let Err(e) = ctx.apply_event(event) {
1210                    log::error!(
1211                        "Failed to apply event from client {}: {e}",
1212                        session.client_id
1213                    );
1214                }
1215            }
1216
1217            MykoMessage::EventBatch(mut events) => {
1218                record_ws_ingest(&events);
1219                let incoming = events.len();
1220                if incoming >= 64 {
1221                    log::trace!(
1222                        "Received event batch from client {} size={}",
1223                        session.client_id,
1224                        incoming
1225                    );
1226                }
1227                for event in &mut events {
1228                    event.sanitize_null_bytes();
1229                    normalize_incoming_event(event, &session.client_id, host_id);
1230                }
1231                match ctx.apply_event_batch(events) {
1232                    Ok(applied) => {
1233                        log::trace!(
1234                            "Applied event batch from client {} size={}",
1235                            session.client_id,
1236                            applied
1237                        );
1238                    }
1239                    Err(e) => {
1240                        log::error!(
1241                            "Failed to apply event batch from client {}: {}",
1242                            session.client_id,
1243                            e
1244                        );
1245                    }
1246                }
1247            }
1248
1249            MykoMessage::Command(wrapped) => {
1250                // Extract tx from the command JSON
1251                let tx_id: Arc<str> = wrapped
1252                    .command
1253                    .get("tx")
1254                    .and_then(|v| v.as_str())
1255                    .unwrap_or("unknown")
1256                    .into();
1257
1258                let command_id = &wrapped.command_id;
1259
1260                log::trace!("Command {} (tx: {})", command_id, tx_id,);
1261                let received_at = Instant::now();
1262                if let Ok(mut map) = command_started_by_tx.lock() {
1263                    map.insert(tx_id.clone(), received_at);
1264                }
1265                if let Err(e) = command_tx.send(CommandJob {
1266                    tx_id: tx_id.clone(),
1267                    command_id: wrapped.command_id.clone(),
1268                    command: wrapped.command.clone(),
1269                    received_at,
1270                }) {
1271                    log::error!(
1272                        "Failed to enqueue command {} for client {} tx={}: {}",
1273                        command_id,
1274                        session.client_id,
1275                        tx_id,
1276                        e
1277                    );
1278                    let error = MykoMessage::CommandError(CommandError {
1279                        tx: tx_id.to_string(),
1280                        command_id: command_id.to_string(),
1281                        message: "Command queue unavailable".to_string(),
1282                    });
1283                    if let Err(err) = priority_tx.try_send(error) {
1284                        drop_logger.on_drop("CommandError", &err);
1285                    }
1286                }
1287            }
1288
1289            MykoMessage::Ping(PingData { id, timestamp }) => {
1290                // Echo back the ping data
1291                let pong = MykoMessage::Ping(PingData { id, timestamp });
1292                if let Err(e) = priority_tx.try_send(pong) {
1293                    drop_logger.on_drop("Ping", &e);
1294                }
1295            }
1296
1297            // Response messages - these shouldn't come from clients.
1298            MykoMessage::QueryResponse(resp) => {
1299                log::warn!(
1300                    "Unexpected client message kind=query_response client={} tx={} seq={} upserts={} deletes={} active_subscriptions={}",
1301                    session.client_id,
1302                    resp.tx,
1303                    resp.sequence,
1304                    resp.upserts.len(),
1305                    resp.deletes.len(),
1306                    session.subscription_count()
1307                );
1308            }
1309            MykoMessage::QueryError(err) => {
1310                log::warn!(
1311                    "Unexpected client message kind=query_error client={} tx={} query_id={} message={} active_subscriptions={}",
1312                    session.client_id,
1313                    err.tx,
1314                    err.query_id,
1315                    err.message,
1316                    session.subscription_count()
1317                );
1318            }
1319            MykoMessage::ViewResponse(resp) => {
1320                log::warn!(
1321                    "Unexpected client message kind=view_response client={} tx={} seq={} upserts={} deletes={} active_subscriptions={}",
1322                    session.client_id,
1323                    resp.tx,
1324                    resp.sequence,
1325                    resp.upserts.len(),
1326                    resp.deletes.len(),
1327                    session.subscription_count()
1328                );
1329            }
1330            MykoMessage::ViewError(err) => {
1331                log::warn!(
1332                    "Unexpected client message kind=view_error client={} tx={} view_id={} message={} active_subscriptions={}",
1333                    session.client_id,
1334                    err.tx,
1335                    err.view_id,
1336                    err.message,
1337                    session.subscription_count()
1338                );
1339            }
1340            MykoMessage::ReportResponse(resp) => {
1341                log::warn!(
1342                    "Unexpected client message kind=report_response client={} tx={} active_subscriptions={}",
1343                    session.client_id,
1344                    resp.tx,
1345                    session.subscription_count()
1346                );
1347            }
1348            MykoMessage::ReportError(err) => {
1349                log::warn!(
1350                    "Unexpected client message kind=report_error client={} tx={} report_id={} message={} active_subscriptions={}",
1351                    session.client_id,
1352                    err.tx,
1353                    err.report_id,
1354                    err.message,
1355                    session.subscription_count()
1356                );
1357            }
1358            MykoMessage::CommandResponse(resp) => {
1359                if resp.tx.trim().is_empty() {
1360                    log::warn!(
1361                        "Malformed client message kind=command_response client={} tx=<empty> active_subscriptions={}",
1362                        session.client_id,
1363                        session.subscription_count()
1364                    );
1365                } else {
1366                    let correlated = outbound_commands_by_tx
1367                        .lock()
1368                        .ok()
1369                        .and_then(|mut map| map.remove(&resp.tx));
1370                    if let Some((command_id, started)) = correlated {
1371                        // Success-path per-command roundtrip detail: fires once
1372                        // per correlated command response (ExecTargetAction etc.),
1373                        // tens of thousands per second under load. Keep at trace;
1374                        // the unmatched/error paths below stay at warn.
1375                        log::trace!(
1376                            "Client command response matched outbound command client={} tx={} command_id={} roundtrip_ms={} active_subscriptions={}",
1377                            session.client_id,
1378                            resp.tx,
1379                            command_id,
1380                            started.elapsed().as_millis(),
1381                            session.subscription_count()
1382                        );
1383                    } else {
1384                        log::warn!(
1385                            "Client command response without outbound match client={} tx={} active_subscriptions={}",
1386                            session.client_id,
1387                            resp.tx,
1388                            session.subscription_count()
1389                        );
1390                    }
1391                }
1392            }
1393            MykoMessage::CommandError(err) => {
1394                if err.tx.trim().is_empty() {
1395                    log::warn!(
1396                        "Malformed client message kind=command_error client={} tx=<empty> command_id={} message={} active_subscriptions={}",
1397                        session.client_id,
1398                        err.command_id,
1399                        err.message,
1400                        session.subscription_count()
1401                    );
1402                } else {
1403                    let correlated = outbound_commands_by_tx
1404                        .lock()
1405                        .ok()
1406                        .and_then(|mut map| map.remove(&err.tx));
1407                    if let Some((command_id, started)) = correlated {
1408                        log::warn!(
1409                            "Client command error matched outbound command client={} tx={} command_id={} transport_command_id={} message={} roundtrip_ms={} active_subscriptions={}",
1410                            session.client_id,
1411                            err.tx,
1412                            err.command_id,
1413                            command_id,
1414                            err.message,
1415                            started.elapsed().as_millis(),
1416                            session.subscription_count()
1417                        );
1418                    } else {
1419                        log::warn!(
1420                            "Client command error without outbound match client={} tx={} command_id={} message={} active_subscriptions={}",
1421                            session.client_id,
1422                            err.tx,
1423                            err.command_id,
1424                            err.message,
1425                            session.subscription_count()
1426                        );
1427                    }
1428                }
1429            }
1430            MykoMessage::Benchmark(payload) => {
1431                let stats = ws_benchmark_stats();
1432                ensure_ws_benchmark_logger();
1433                stats.message_count.fetch_add(1, Ordering::Relaxed);
1434                // Estimate payload size from the JSON value
1435                let size = payload.to_string().len() as u64;
1436                stats.total_bytes.fetch_add(size, Ordering::Relaxed);
1437            }
1438        }
1439
1440        Ok(())
1441    }
1442
1443    fn execute_command_job(
1444        ctx: Arc<CellServerCtx>,
1445        priority_tx: &mpsc::Sender<MykoMessage>,
1446        drop_logger: &DropLogger,
1447        client_id: Arc<str>,
1448        job: CommandJob,
1449    ) {
1450        let host_id = ctx.host_id;
1451        let started = Instant::now();
1452        let queue_wait_ms = started.duration_since(job.received_at).as_millis();
1453        let command_id = job.command_id.clone();
1454
1455        let mut handler_found = false;
1456        for registration in inventory::iter::<CommandHandlerRegistration> {
1457            if registration.command_id == command_id {
1458                handler_found = true;
1459                let executor = (registration.factory)();
1460
1461                let req = Arc::new(RequestContext::from_client(
1462                    job.tx_id.clone(),
1463                    client_id.clone(),
1464                    host_id,
1465                ));
1466                let cmd_id: Arc<str> = Arc::from(command_id.clone());
1467                let cmd_ctx = CommandContext::new(cmd_id, req, ctx.clone());
1468                let execute_started = Instant::now();
1469
1470                match executor.execute_from_value(job.command.clone(), cmd_ctx) {
1471                    Ok(result) => {
1472                        let response = MykoMessage::CommandResponse(CommandResponse {
1473                            response: result,
1474                            tx: job.tx_id.to_string(),
1475                        });
1476                        if let Err(e) = priority_tx.try_send(response) {
1477                            drop_logger.on_drop("CommandResponse", &e);
1478                        }
1479                    }
1480                    Err(e) => {
1481                        let error = MykoMessage::CommandError(CommandError {
1482                            tx: job.tx_id.to_string(),
1483                            command_id: command_id.clone(),
1484                            message: e.message,
1485                        });
1486                        if let Err(err) = priority_tx.try_send(error) {
1487                            drop_logger.on_drop("CommandError", &err);
1488                        }
1489                    }
1490                }
1491                let execute_ms = execute_started.elapsed().as_millis();
1492                let total_ms = job.received_at.elapsed().as_millis();
1493                log::trace!(
1494                    target: "myko_server::ws_perf",
1495                    "command_exec client={} tx={} command_id={} queue_wait_ms={} execute_ms={} total_ms={}",
1496                    client_id,
1497                    job.tx_id,
1498                    command_id,
1499                    queue_wait_ms,
1500                    execute_ms,
1501                    total_ms
1502                );
1503                break;
1504            }
1505        }
1506
1507        if !handler_found {
1508            log::warn!("No registered handler for command: {}", command_id);
1509            let error = MykoMessage::CommandError(CommandError {
1510                tx: job.tx_id.to_string(),
1511                command_id: command_id.clone(),
1512                message: format!("Command handler not found: {}", command_id),
1513            });
1514            if let Err(e) = priority_tx.try_send(error) {
1515                drop_logger.on_drop("CommandError", &e);
1516            }
1517        }
1518
1519        if !handler_found {
1520            log::debug!(
1521                target: "myko_server::ws_perf",
1522                "command_exec client={} tx={} command_id={} queue_wait_ms={} execute_ms=0 total_ms={} handler_found=false",
1523                client_id,
1524                job.tx_id,
1525                command_id,
1526                queue_wait_ms,
1527                job.received_at.elapsed().as_millis()
1528            );
1529        }
1530    }
1531}
1532
1533/// Channel-based WebSocket writer.
1534///
1535/// Sends messages through an mpsc channel which are then
1536/// forwarded to the actual WebSocket.
1537struct ChannelWriter {
1538    tx: mpsc::Sender<OutboundMessage>,
1539    deferred_tx: mpsc::Sender<DeferredOutbound>,
1540    drop_logger: Arc<DropLogger>,
1541    outgoing_format: Arc<AtomicU8>,
1542}
1543
1544impl ChannelWriter {
1545    /// Cheap "is the writer task gone?" check used to short-circuit
1546    /// subscriber callbacks for a disconnected client. `Sender::is_closed`
1547    /// returns true once the matching receiver is dropped, which happens as
1548    /// soon as the write task exits. Avoiding the work here prevents the
1549    /// "buffer full / channel closed" log storm we used to see for 10+
1550    /// seconds after every disconnect while the session was still tearing
1551    /// down its subscription guards.
1552    #[inline]
1553    fn tx_dead(&self) -> bool {
1554        self.tx.is_closed()
1555    }
1556
1557    #[inline]
1558    fn deferred_dead(&self) -> bool {
1559        self.deferred_tx.is_closed()
1560    }
1561}
1562
1563impl WsWriter for ChannelWriter {
1564    fn send(&self, msg: MykoMessage) {
1565        // Fast path: writer is gone. Don't try to send and don't log; the
1566        // dead-channel state is expected after the client disconnects, and
1567        // a dropped subscription will follow shortly when the session
1568        // teardown finishes. Avoiding the log+counter prevents the
1569        // "buffer full / channel closed" warning storm we used to see for
1570        // 10+ seconds after every disconnect.
1571        if self.tx_dead() {
1572            return;
1573        }
1574        if let Err(e) = self.tx.try_send(OutboundMessage::Message(msg)) {
1575            // Closed errors here race with the writer task exiting between
1576            // the is_dead check and the try_send; suppress them too.
1577            if !matches!(e, mpsc::error::TrySendError::Closed(_)) {
1578                self.drop_logger.on_drop("message", &e);
1579            }
1580        }
1581    }
1582
1583    fn protocol(&self) -> MykoProtocol {
1584        MykoProtocol::from(self.outgoing_format.load(Ordering::SeqCst))
1585    }
1586
1587    fn send_serialized_command(
1588        &self,
1589        tx: Arc<str>,
1590        command_id: String,
1591        payload: EncodedCommandMessage,
1592    ) {
1593        if self.tx_dead() {
1594            return;
1595        }
1596        if let Err(e) = self.tx.try_send(OutboundMessage::SerializedCommand {
1597            tx,
1598            command_id,
1599            payload,
1600        }) && !matches!(e, mpsc::error::TrySendError::Closed(_))
1601        {
1602            self.drop_logger.on_drop("serialized_command", &e);
1603        }
1604    }
1605
1606    fn send_report_response(&self, tx: Arc<str>, output: Arc<dyn AnyOutput>) {
1607        if self.deferred_dead() {
1608            return;
1609        }
1610        if let Err(e) = self
1611            .deferred_tx
1612            .try_send(DeferredOutbound::Report(tx, output))
1613            && !matches!(e, mpsc::error::TrySendError::Closed(_))
1614        {
1615            self.drop_logger.on_drop("ReportResponseDeferred", &e);
1616        }
1617    }
1618
1619    fn send_query_response(&self, response: PendingQueryResponse, is_view: bool) {
1620        if self.deferred_dead() {
1621            return;
1622        }
1623        if let Err(e) = self
1624            .deferred_tx
1625            .try_send(DeferredOutbound::Query { response, is_view })
1626            && !matches!(e, mpsc::error::TrySendError::Closed(_))
1627        {
1628            self.drop_logger.on_drop("QueryResponseDeferred", &e);
1629        }
1630    }
1631}
1632
1633#[cfg(test)]
1634mod tests {
1635    use super::*;
1636
1637    #[test]
1638    fn test_channel_writer() {
1639        let (tx, mut rx) = mpsc::channel(10);
1640        let (deferred_tx, _deferred_rx) = mpsc::channel(10);
1641        let drop_logger = Arc::new(DropLogger::new("test-client".into()));
1642        let writer = ChannelWriter {
1643            tx,
1644            deferred_tx,
1645            drop_logger,
1646            outgoing_format: Arc::new(AtomicU8::new(MykoProtocol::JSON as u8)),
1647        };
1648
1649        let msg = MykoMessage::Ping(PingData {
1650            id: "test".to_string(),
1651            timestamp: 0,
1652        });
1653        writer.send(msg);
1654
1655        let received = rx.try_recv().unwrap();
1656        assert!(matches!(
1657            received,
1658            OutboundMessage::Message(MykoMessage::Ping(_))
1659        ));
1660    }
1661
1662    #[test]
1663    fn outgoing_format_starts_as_json_and_promotes_to_cbor() {
1664        use std::sync::atomic::{AtomicU8, Ordering};
1665
1666        let outgoing_format = AtomicU8::new(MykoProtocol::JSON as u8);
1667
1668        // Initially JSON.
1669        assert_eq!(
1670            MykoProtocol::from(outgoing_format.load(Ordering::SeqCst)),
1671            MykoProtocol::JSON,
1672        );
1673
1674        // Simulate receiving a binary frame: promote.
1675        outgoing_format.store(MykoProtocol::CBOR as u8, Ordering::SeqCst);
1676        assert_eq!(
1677            MykoProtocol::from(outgoing_format.load(Ordering::SeqCst)),
1678            MykoProtocol::CBOR,
1679        );
1680
1681        // Simulate receiving more text frames after promotion: no change.
1682        // (The handler in the read loop only writes on Binary, never on Text,
1683        // so this is a no-op assertion that the field's last-write-wins
1684        // semantics give us stickiness for free.)
1685        assert_eq!(
1686            MykoProtocol::from(outgoing_format.load(Ordering::SeqCst)),
1687            MykoProtocol::CBOR,
1688        );
1689    }
1690}