Skip to main content

myko_server/
ws_handler.rs

1//! WebSocket handler for the cell-based server.
2//!
3//! Handles WebSocket connections using ClientSession for subscription management.
4
5use std::{
6    collections::HashMap,
7    net::SocketAddr,
8    sync::{
9        Arc, Mutex, OnceLock,
10        atomic::{AtomicBool, AtomicU8, AtomicU64, Ordering},
11    },
12    thread,
13    time::{Duration, Instant},
14};
15
16use futures_util::{SinkExt, StreamExt};
17use hyphae::SelectExt;
18use myko::{
19    WS_MAX_FRAME_SIZE_BYTES, WS_MAX_MESSAGE_SIZE_BYTES,
20    client::MykoProtocol,
21    command::{CommandContext, CommandHandlerRegistration},
22    entities::client::{Client, ClientId},
23    relationship::{
24        iter_client_id_registrations, iter_fallback_to_id_registrations,
25        iter_server_owned_registrations,
26    },
27    report::AnyOutput,
28    request::RequestContext,
29    server::{
30        CellServerCtx, ClientSession, PendingQueryResponse, WsWriter,
31        client_registry::try_client_registry,
32    },
33    wire::{
34        CancelSubscription, CommandError, CommandResponse, EncodedCommandMessage, MEvent,
35        MEventType, MykoMessage, PingData, QueryWindowUpdate, ViewError, ViewWindowUpdate,
36    },
37};
38use tokio::{net::TcpStream, sync::mpsc, time::interval};
39use tokio_tungstenite::{
40    accept_async_with_config,
41    tungstenite::{Message, protocol::WebSocketConfig},
42};
43use uuid::Uuid;
44
45struct WsBenchmarkStats {
46    message_count: AtomicU64,
47    total_bytes: AtomicU64,
48}
49
50static WS_BENCHMARK_STATS: OnceLock<Arc<WsBenchmarkStats>> = OnceLock::new();
51static WS_BENCHMARK_LOGGER_STARTED: AtomicBool = AtomicBool::new(false);
52
53fn ws_benchmark_stats() -> Arc<WsBenchmarkStats> {
54    WS_BENCHMARK_STATS
55        .get_or_init(|| {
56            Arc::new(WsBenchmarkStats {
57                message_count: AtomicU64::new(0),
58                total_bytes: AtomicU64::new(0),
59            })
60        })
61        .clone()
62}
63
64fn ensure_ws_benchmark_logger() {
65    if WS_BENCHMARK_LOGGER_STARTED
66        .compare_exchange(false, true, Ordering::Relaxed, Ordering::Relaxed)
67        .is_err()
68    {
69        return;
70    }
71
72    let stats = ws_benchmark_stats();
73    thread::Builder::new()
74        .name("ws-benchmark-logger".into())
75        .spawn(move || {
76            loop {
77                thread::sleep(Duration::from_secs(1));
78
79                let count = stats.message_count.swap(0, Ordering::Relaxed);
80                let bytes = stats.total_bytes.swap(0, Ordering::Relaxed);
81
82                if count == 0 {
83                    continue;
84                }
85
86                log::info!(
87                    "WebSocket benchmark last_1s messages={} bytes={} avg_bytes={}",
88                    count,
89                    bytes,
90                    bytes / count
91                );
92            }
93        })
94        .expect("failed to spawn websocket benchmark logger thread");
95}
96
97fn normalize_incoming_event(event: &mut MEvent, client_id: &str, host_id: uuid::Uuid) {
98    if event.change_type != MEventType::SET {
99        return;
100    }
101
102    // Auto-populate #[myko_client_id] fields with the connection's client_id
103    for reg in iter_client_id_registrations() {
104        if reg.entity_type == event.item_type {
105            if let Some(obj) = event.item.as_object_mut() {
106                obj.insert(
107                    reg.field_name_json.to_string(),
108                    serde_json::Value::String(client_id.to_string()),
109                );
110            }
111            break;
112        }
113    }
114
115    // Auto-populate #[server_owned] fields with this server's ID
116    for reg in iter_server_owned_registrations() {
117        if reg.entity_type == event.item_type {
118            if let Some(obj) = event.item.as_object_mut() {
119                let field = reg.field_name_json;
120                let current = obj.get(field).and_then(|v| v.as_str()).unwrap_or("");
121                if current.is_empty() {
122                    obj.insert(
123                        field.to_string(),
124                        serde_json::Value::String(host_id.to_string()),
125                    );
126                }
127            }
128            break;
129        }
130    }
131
132    // Auto-populate #[fallback_to_id] fields with the entity's own id
133    // if the field is null or missing.
134    if let Some(obj) = event.item.as_object_mut()
135        && let Some(id) = obj.get("id").and_then(|v| v.as_str()).map(String::from)
136    {
137        for reg in iter_fallback_to_id_registrations() {
138            if reg.entity_type == event.item_type {
139                let field = reg.field_name_json;
140                if matches!(obj.get(field), None | Some(serde_json::Value::Null)) {
141                    obj.insert(field.to_string(), serde_json::Value::String(id.clone()));
142                }
143            }
144        }
145    }
146}
147
148/// Per-connection drop tracking to avoid log storms when clients fall behind.
149///
150/// When the outbound channel is full, we will drop messages (same as today),
151/// but we must not `warn!` for every drop or we can effectively DoS ourselves.
152struct DropLogger {
153    client_id: Arc<str>,
154    dropped: std::sync::atomic::AtomicU64,
155    last_log_ms: std::sync::atomic::AtomicU64,
156}
157
158impl DropLogger {
159    fn new(client_id: Arc<str>) -> Self {
160        Self {
161            client_id,
162            dropped: std::sync::atomic::AtomicU64::new(0),
163            last_log_ms: std::sync::atomic::AtomicU64::new(0),
164        }
165    }
166
167    fn on_drop(&self, kind: &'static str, err: &dyn std::fmt::Display) {
168        use std::sync::atomic::Ordering;
169
170        self.dropped.fetch_add(1, Ordering::Relaxed);
171
172        // Log at most once per second per connection.
173        let now_ms = std::time::SystemTime::now()
174            .duration_since(std::time::UNIX_EPOCH)
175            .unwrap_or_default()
176            .as_millis() as u64;
177        let last_ms = self.last_log_ms.load(Ordering::Relaxed);
178        if now_ms.saturating_sub(last_ms) < 1000 {
179            return;
180        }
181
182        if self
183            .last_log_ms
184            .compare_exchange(last_ms, now_ms, Ordering::Relaxed, Ordering::Relaxed)
185            .is_err()
186        {
187            return;
188        }
189
190        let n = self.dropped.swap(0, Ordering::Relaxed);
191        log::warn!(
192            "WebSocket send buffer full; dropped {} message(s) for client {} (latest: {}): {}",
193            n,
194            self.client_id,
195            kind,
196            err
197        );
198    }
199}
200
201struct CommandJob {
202    tx_id: Arc<str>,
203    command_id: String,
204    command: serde_json::Value,
205    received_at: Instant,
206}
207
208/// Result of an async subscription build (query or view cell_factory).
209enum SubscriptionReady {
210    Query {
211        tx_id: Arc<str>,
212        cellmap: hyphae::CellMap<Arc<str>, Arc<dyn myko::item::AnyItem>, hyphae::CellImmutable>,
213        window: Option<myko::wire::QueryWindow>,
214    },
215    View {
216        tx_id: Arc<str>,
217        view_id: Arc<str>,
218        cellmap: hyphae::CellMap<Arc<str>, Arc<dyn myko::item::AnyItem>, hyphae::CellImmutable>,
219        window: Option<myko::wire::QueryWindow>,
220    },
221}
222
223enum OutboundMessage {
224    Message(MykoMessage),
225    SerializedCommand {
226        tx: Arc<str>,
227        command_id: String,
228        payload: EncodedCommandMessage,
229    },
230}
231
232enum DeferredOutbound {
233    Report(Arc<str>, Arc<dyn AnyOutput>),
234    Query {
235        response: PendingQueryResponse,
236        is_view: bool,
237    },
238}
239
240/// WebSocket handler for a single client connection.
241pub struct WsHandler;
242
243impl WsHandler {
244    /// Handle a new WebSocket connection (performs the handshake).
245    pub async fn handle_connection(
246        stream: TcpStream,
247        addr: SocketAddr,
248        ctx: Arc<CellServerCtx>,
249    ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
250        let mut ws_config = WebSocketConfig::default();
251        ws_config.max_message_size = Some(WS_MAX_MESSAGE_SIZE_BYTES);
252        ws_config.max_frame_size = Some(WS_MAX_FRAME_SIZE_BYTES);
253        let ws_stream = accept_async_with_config(stream, Some(ws_config)).await?;
254        Self::handle_upgraded(ws_stream, addr, ctx).await
255    }
256
257    /// Handle a WebSocket connection whose HTTP/1.1 handshake has already
258    /// completed and produced a [`tokio_tungstenite::WebSocketStream`].
259    ///
260    /// Used by the front-door router when it pre-parses the HTTP request
261    /// (to dispatch between `/myko` WS and `/myko/mcp` HTTP/WS) and then
262    /// completes the WS handshake itself.
263    #[allow(clippy::too_many_arguments)]
264    pub async fn handle_upgraded(
265        ws_stream: tokio_tungstenite::WebSocketStream<TcpStream>,
266        addr: SocketAddr,
267        ctx: Arc<CellServerCtx>,
268    ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
269        let host_id = ctx.host_id;
270
271        let (mut write, mut read) = ws_stream.split();
272
273        // Create a bounded channel for sending messages to the client
274        // High limit (10k) since we have good memory availability
275        let (tx, mut rx) = mpsc::channel::<OutboundMessage>(10_000);
276        let (deferred_tx, mut deferred_rx) = mpsc::channel::<DeferredOutbound>(10_000);
277        let (priority_tx, mut priority_rx) = mpsc::channel::<MykoMessage>(1_000);
278        let (command_tx, mut command_rx) = mpsc::unbounded_channel::<CommandJob>();
279        let (subscribe_tx, mut subscribe_rx) = mpsc::unbounded_channel::<SubscriptionReady>();
280
281        // Outgoing format for this session: defaults to JSON, sticky-promotes
282        // to CBOR on the first received binary frame. Never demotes.
283        let outgoing_format = Arc::new(AtomicU8::new(MykoProtocol::JSON as u8));
284
285        // Create client session with channel-based writer
286        let client_id: Arc<str> = Uuid::new_v4().to_string().into();
287        let drop_logger = Arc::new(DropLogger::new(client_id.clone()));
288        let writer = ChannelWriter {
289            tx: tx.clone(),
290            deferred_tx: deferred_tx.clone(),
291            drop_logger: drop_logger.clone(),
292            outgoing_format: outgoing_format.clone(),
293        };
294
295        // Register writer in the global client registry (if initialized)
296        let writer_arc: Arc<dyn WsWriter> = Arc::new(ChannelWriter {
297            tx: tx.clone(),
298            deferred_tx: deferred_tx.clone(),
299            drop_logger: drop_logger.clone(),
300            outgoing_format: outgoing_format.clone(),
301        });
302        if let Some(registry) = try_client_registry() {
303            registry.register(client_id.clone(), writer_arc);
304        }
305
306        let mut session = ClientSession::new(client_id.clone(), writer);
307
308        let outgoing_format_writer = outgoing_format.clone();
309        let query_ids_by_tx: Arc<Mutex<HashMap<Arc<str>, Arc<str>>>> =
310            Arc::new(Mutex::new(HashMap::new()));
311        let view_ids_by_tx: Arc<Mutex<HashMap<Arc<str>, Arc<str>>>> =
312            Arc::new(Mutex::new(HashMap::new()));
313        let subscribe_started_by_tx: Arc<Mutex<HashMap<Arc<str>, Instant>>> =
314            Arc::new(Mutex::new(HashMap::new()));
315        let command_started_by_tx: Arc<Mutex<HashMap<Arc<str>, Instant>>> =
316            Arc::new(Mutex::new(HashMap::new()));
317        let outbound_commands_by_tx: Arc<Mutex<HashMap<String, (String, Instant)>>> =
318            Arc::new(Mutex::new(HashMap::new()));
319
320        let outbound_commands_by_tx_writer = outbound_commands_by_tx.clone();
321
322        // Publish Client entity
323        let client_entity = Client {
324            id: ClientId(client_id.clone()),
325            server_id: host_id.to_string().into(),
326            address: Some(Arc::from(addr.to_string())),
327            windback: None,
328        };
329        if let Err(e) = ctx.set(&client_entity) {
330            log::error!("Failed to persist client entity: {e}");
331        }
332
333        log::info!("Client connected: {} from {}", client_id, addr);
334
335        let write_ctx = ctx.clone();
336        let write_client_id = client_id.clone();
337        let write_addr = addr;
338        let command_ctx = ctx.clone();
339        let command_priority_tx = priority_tx.clone();
340        let command_drop_logger = drop_logger.clone();
341        let command_client_id = client_id.clone();
342
343        // Spawn task to forward messages from channel to WebSocket
344        let write_task = tokio::spawn(async move {
345            let _ctx = write_ctx;
346            let mut normal_open = true;
347            let mut priority_open = true;
348            let mut deferred_open = true;
349            while normal_open || priority_open || deferred_open {
350                let msg = tokio::select! {
351                    biased;
352                    maybe = priority_rx.recv(), if priority_open => {
353                        match maybe {
354                            Some(msg) => OutboundMessage::Message(msg),
355                            None => {
356                                priority_open = false;
357                                continue;
358                            }
359                        }
360                    }
361                    maybe = deferred_rx.recv(), if deferred_open => {
362                        match maybe {
363                            Some(DeferredOutbound::Report(tx, output)) => {
364                                OutboundMessage::Message(MykoMessage::ReportResponse(myko::wire::ReportResponse {
365                                    response: output.to_value(),
366                                    tx: tx.to_string(),
367                                }))
368                            }
369                            Some(DeferredOutbound::Query { response, is_view }) => {
370                                if is_view {
371                                    OutboundMessage::Message(MykoMessage::ViewResponse(response.into_wire()))
372                                } else {
373                                    OutboundMessage::Message(MykoMessage::QueryResponse(response.into_wire()))
374                                }
375                            }
376                            None => {
377                                deferred_open = false;
378                                continue;
379                            }
380                        }
381                    }
382                    maybe = rx.recv(), if normal_open => {
383                        match maybe {
384                            Some(msg) => msg,
385                            None => {
386                                normal_open = false;
387                                continue;
388                            }
389                        }
390                    }
391                };
392                // Per-message timing tap (outbound). Counts by kind into the
393                // global ws_timing instrumentation. Counterpart to the inbound
394                // tap in `handle_message`.
395                match &msg {
396                    OutboundMessage::SerializedCommand { .. } => {
397                        crate::ws_timing::record_outbound("Command")
398                    }
399                    OutboundMessage::Message(m) => {
400                        crate::ws_timing::record_outbound(crate::ws_timing::message_kind(m))
401                    }
402                };
403                let (kind, tx_id, seq, _upserts, _deletes, _total_count) = match &msg {
404                    OutboundMessage::SerializedCommand { tx, .. } => {
405                        ("command", Some(tx.clone()), None, None, None, None)
406                    }
407                    OutboundMessage::Message(msg) => match msg {
408                        MykoMessage::ViewResponse(r) => (
409                            "view_response",
410                            Some(r.tx.clone()),
411                            Some(r.sequence),
412                            Some(r.upserts.len()),
413                            Some(r.deletes.len()),
414                            r.total_count,
415                        ),
416                        MykoMessage::QueryResponse(r) => (
417                            "query_response",
418                            Some(r.tx.clone()),
419                            Some(r.sequence),
420                            Some(r.upserts.len()),
421                            Some(r.deletes.len()),
422                            r.total_count,
423                        ),
424                        MykoMessage::CommandResponse(r) => (
425                            "command_response",
426                            Some(Arc::<str>::from(r.tx.clone())),
427                            None,
428                            None,
429                            None,
430                            None,
431                        ),
432                        MykoMessage::CommandError(r) => (
433                            "command_error",
434                            Some(Arc::<str>::from(r.tx.clone())),
435                            None,
436                            None,
437                            None,
438                            None,
439                        ),
440                        _ => ("other", None, None, None, None, None),
441                    },
442                };
443
444                match &msg {
445                    OutboundMessage::SerializedCommand { tx, command_id, .. } => {
446                        if !tx.trim().is_empty()
447                            && let Ok(mut map) = outbound_commands_by_tx_writer.lock()
448                        {
449                            map.insert(tx.to_string(), (command_id.clone(), Instant::now()));
450                        }
451                    }
452                    OutboundMessage::Message(MykoMessage::Command(wrapped)) => {
453                        if let Some(tx_id) = wrapped.command.get("tx").and_then(|v| v.as_str())
454                            && !tx_id.trim().is_empty()
455                            && let Ok(mut map) = outbound_commands_by_tx_writer.lock()
456                        {
457                            map.insert(
458                                tx_id.to_string(),
459                                (wrapped.command_id.clone(), Instant::now()),
460                            );
461                        }
462                    }
463                    _ => {}
464                }
465
466                let ws_msg = match &msg {
467                    OutboundMessage::SerializedCommand {
468                        payload: EncodedCommandMessage::Json(json),
469                        ..
470                    } => Message::Text(json.clone().into()),
471                    OutboundMessage::SerializedCommand {
472                        payload: EncodedCommandMessage::Cbor(bytes),
473                        ..
474                    } => Message::Binary(bytes.clone().into()),
475                    OutboundMessage::Message(msg)
476                        if outgoing_format_writer.load(Ordering::SeqCst)
477                            == MykoProtocol::CBOR as u8 =>
478                    {
479                        let mut bytes = Vec::new();
480                        match ciborium::ser::into_writer(msg, &mut bytes) {
481                            Ok(()) => Message::Binary(bytes.into()),
482                            Err(e) => {
483                                log::error!("Failed to serialize message to CBOR: {}", e);
484                                continue;
485                            }
486                        }
487                    }
488                    OutboundMessage::Message(msg) => match serde_json::to_string(msg) {
489                        Ok(json) => Message::Text(json.into()),
490                        Err(e) => {
491                            log::error!("Failed to serialize message to JSON: {}", e);
492                            continue;
493                        }
494                    },
495                };
496                let payload_bytes = match &ws_msg {
497                    Message::Binary(b) => b.len(),
498                    Message::Text(t) => t.len(),
499                    _ => 0,
500                };
501
502                if let Err(err) = write.send(ws_msg).await {
503                    log::error!(
504                        "WebSocket write failed for client {} from {} kind={} tx={:?} seq={:?} payload_bytes={} binary={}: {}",
505                        write_client_id,
506                        write_addr,
507                        kind,
508                        tx_id,
509                        seq,
510                        payload_bytes,
511                        outgoing_format_writer.load(Ordering::SeqCst) == MykoProtocol::CBOR as u8,
512                        err
513                    );
514                    break;
515                }
516            }
517            // NOTE(ts): Unregister from client registry immediately so the node
518            // executor stops serializing commands into a dead channel.
519            if let Some(registry) = try_client_registry() {
520                registry.unregister(&write_client_id);
521                log::info!(
522                    "WebSocket writer unregistered client {} from {} (write task exiting)",
523                    write_client_id,
524                    write_addr,
525                );
526            }
527            log::warn!(
528                "WebSocket writer task exiting for client {} from {} normal_open={} priority_open={} deferred_open={}",
529                write_client_id,
530                write_addr,
531                normal_open,
532                priority_open,
533                deferred_open
534            );
535        });
536
537        // Execute commands on a dedicated worker so ping/cancel traffic is never
538        // blocked by long-running command handlers.
539        let command_started_cleanup = command_started_by_tx.clone();
540        let command_task = tokio::spawn(async move {
541            while let Some(job) = command_rx.recv().await {
542                let command_ctx = command_ctx.clone();
543                let command_priority_tx = command_priority_tx.clone();
544                let command_drop_logger = command_drop_logger.clone();
545                let command_client_id = command_client_id.clone();
546                let tx_id = job.tx_id.clone();
547                let started_map = command_started_cleanup.clone();
548                match tokio::task::spawn_blocking(move || {
549                    Self::execute_command_job(
550                        command_ctx,
551                        &command_priority_tx,
552                        command_drop_logger.as_ref(),
553                        command_client_id,
554                        job,
555                    );
556                })
557                .await
558                {
559                    Ok(()) => {}
560                    Err(e) => {
561                        log::error!("Command worker panicked: {}", e);
562                    }
563                }
564                // NOTE(ts): Clean up timing entry after command completes (success or panic).
565                if let Ok(mut map) = started_map.lock() {
566                    map.remove(&tx_id);
567                }
568            }
569        });
570
571        // Process incoming messages and completed subscription builds concurrently.
572        // NOTE(ts): View/query cell_factory calls are spawned on the blocking thread pool
573        // so they don't block command processing or other messages.
574        let mut outbound_ttl_interval = interval(Duration::from_secs(10));
575        outbound_ttl_interval.tick().await; // NOTE(ts): consume the immediate first tick
576        loop {
577            tokio::select! {
578                // Completed subscription builds — register with session
579                Some(ready) = subscribe_rx.recv() => {
580                    let tx_id = match &ready {
581                        SubscriptionReady::Query { tx_id, .. } => tx_id.clone(),
582                        SubscriptionReady::View { tx_id, .. } => tx_id.clone(),
583                    };
584                    if let Ok(mut map) = subscribe_started_by_tx.lock() {
585                        map.remove(&tx_id);
586                    }
587                    match ready {
588                        SubscriptionReady::Query { tx_id, cellmap, window } => {
589                            session.subscribe_query(tx_id, cellmap, window);
590                        }
591                        SubscriptionReady::View { tx_id, view_id, cellmap, window } => {
592                            session.subscribe_view_with_id(tx_id, view_id, cellmap, window);
593                        }
594                    }
595                }
596                // NOTE(ts): Sweep outbound command entries older than 10s.
597                // Responses normally arrive quickly; stale entries are from
598                // dropped connections or commands that will never get a response.
599                _ = outbound_ttl_interval.tick() => {
600                    if let Ok(mut map) = outbound_commands_by_tx.lock() {
601                        let before = map.len();
602                        map.retain(|_, (_, started)| started.elapsed() < Duration::from_secs(10));
603                        let removed = before - map.len();
604                        if removed > 0 {
605                            log::debug!(
606                                "Outbound command TTL sweep client={}: removed {} stale entries, {} remaining",
607                                session.client_id,
608                                removed,
609                                map.len()
610                            );
611                        }
612                    }
613                }
614                // Incoming WebSocket messages
615                msg = read.next() => {
616                    let Some(msg) = msg else { break };
617                    let ctx = ctx.clone();
618                    let msg = match msg {
619                        Ok(m) => m,
620                        Err(e) => {
621                            log::error!("WebSocket read error from {}: {}", client_id, e);
622                            break;
623                        }
624                    };
625
626                    match msg {
627                        Message::Binary(data) => {
628                            if outgoing_format.load(Ordering::SeqCst) != MykoProtocol::CBOR as u8 {
629                                log::debug!(
630                                    "Client {} promoted outgoing format to CBOR via demonstration",
631                                    client_id
632                                );
633                                outgoing_format.store(MykoProtocol::CBOR as u8, Ordering::SeqCst);
634                            }
635
636                            match ciborium::de::from_reader::<MykoMessage, _>(data.as_ref()) {
637                                Ok(myko_msg) => {
638                                    if let Err(e) = Self::handle_message(
639                                        &mut session,
640                                        ctx,
641                                        &priority_tx,
642                                        &drop_logger,
643                                        &query_ids_by_tx,
644                                        &view_ids_by_tx,
645                                        &subscribe_started_by_tx,
646                                        &command_started_by_tx,
647                                        &outbound_commands_by_tx,
648                                        &command_tx,
649                                        &subscribe_tx,
650                                        myko_msg,
651                                    ) {
652                                        log::error!("Error handling message: {}", e);
653                                    }
654                                    tokio::task::yield_now().await;
655                                }
656                                Err(e) => {
657                                    log::warn!("Failed to parse message from {}: {}", client_id, e);
658                                }
659                            }
660                        }
661                        Message::Text(text) => {
662                            match serde_json::from_str::<MykoMessage>(&text) {
663                                Ok(myko_msg) => {
664                                    if let Err(e) = Self::handle_message(
665                                        &mut session,
666                                        ctx,
667                                        &priority_tx,
668                                        &drop_logger,
669                                        &query_ids_by_tx,
670                                        &view_ids_by_tx,
671                                        &subscribe_started_by_tx,
672                                        &command_started_by_tx,
673                                        &outbound_commands_by_tx,
674                                        &command_tx,
675                                        &subscribe_tx,
676                                        myko_msg,
677                                    ) {
678                                        log::error!("Error handling message: {}", e);
679                                    }
680                                    tokio::task::yield_now().await;
681                                }
682                                Err(e) => {
683                                    log::warn!(
684                                        "Failed to parse JSON message from {}: {} | raw: {}",
685                                        client_id,
686                                        e,
687                                        if text.len() > 1000 {
688                                            &text[..1000]
689                                        } else {
690                                            &text
691                                        }
692                                    );
693                                }
694                            }
695                        }
696                        Message::Ping(data) => {
697                            log::trace!("Ping from {}", client_id);
698                            let _ = data;
699                        }
700                        Message::Pong(_) => {
701                            log::trace!("Pong from {}", client_id);
702                        }
703                        Message::Close(frame) => {
704                            log::warn!("Client {} sent close frame: {:?}", client_id, frame);
705                            break;
706                        }
707                        Message::Frame(_) => {}
708                    }
709                }
710            }
711        }
712
713        // Cleanup. Order matters:
714        // 1. Abort the write/command tasks first so their channel receivers
715        //    are dropped — `ChannelWriter` clones held by subscriber
716        //    callbacks then short-circuit via `tx_dead()` / `deferred_dead()`
717        //    instead of doing useless work and emitting log spam while we
718        //    tear down the session.
719        // 2. Unregister the client writer from the global registry so other
720        //    parts of the system stop dispatching commands here.
721        // 3. Move the session drop to a blocking thread. `drop(session)`
722        //    cancels each subscription guard, and hyphae's per-cell
723        //    unsubscribe is O(N) in the cell's subscriber list (Vec clone +
724        //    filter + ArcSwap store), so a session with thousands of
725        //    subscriptions can take significant CPU time. Doing it on the
726        //    async runtime would block other connections' tasks; the
727        //    blocking pool is the right place for this.
728        write_task.abort();
729        command_task.abort();
730        if let Some(registry) = try_client_registry() {
731            registry.unregister(&client_id);
732        }
733
734        let drop_client_id = client_id.clone();
735        tokio::task::spawn_blocking(move || {
736            drop(session); // Drops all subscription guards
737            log::trace!(
738                "Client session subscriptions torn down for {}",
739                drop_client_id
740            );
741        });
742
743        // Delete Client entity
744        if let Err(e) = ctx.del(&client_entity) {
745            log::error!("Failed to delete client entity: {e}");
746        }
747
748        log::info!("Client disconnected: {} from {}", client_id, addr);
749
750        Ok(())
751    }
752
753    /// Handle a parsed MykoMessage.
754    #[allow(clippy::too_many_arguments)]
755    fn handle_message<W: WsWriter>(
756        session: &mut ClientSession<W>,
757        ctx: Arc<CellServerCtx>,
758        priority_tx: &mpsc::Sender<MykoMessage>,
759        drop_logger: &Arc<DropLogger>,
760        query_ids_by_tx: &Arc<Mutex<HashMap<Arc<str>, Arc<str>>>>,
761        view_ids_by_tx: &Arc<Mutex<HashMap<Arc<str>, Arc<str>>>>,
762        subscribe_started_by_tx: &Arc<Mutex<HashMap<Arc<str>, Instant>>>,
763        command_started_by_tx: &Arc<Mutex<HashMap<Arc<str>, Instant>>>,
764        outbound_commands_by_tx: &Arc<Mutex<HashMap<String, (String, Instant)>>>,
765        command_tx: &mpsc::UnboundedSender<CommandJob>,
766        subscribe_tx: &mpsc::UnboundedSender<SubscriptionReady>,
767        msg: MykoMessage,
768    ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
769        // Per-message timing tap. Counts inbound messages by kind into the
770        // global instrumentation. Periodic summarizer logs the deltas.
771        crate::ws_timing::record_inbound(crate::ws_timing::message_kind(&msg));
772
773        let handler_registry = ctx.handler_registry.clone();
774
775        let registry = ctx.registry.clone();
776
777        let host_id = ctx.host_id;
778
779        match msg {
780            MykoMessage::Query(wrapped) => {
781                // Extract tx from the query JSON
782                let tx_id: Arc<str> = wrapped
783                    .query
784                    .get("tx")
785                    .and_then(|v| v.as_str())
786                    .unwrap_or("unknown")
787                    .into();
788                let query_id = &wrapped.query_id;
789                let entity_type = &wrapped.query_item_type;
790
791                // Defensive dedupe: some clients can replay the same subscribe request.
792                // A duplicate for the same tx would reset server-side sequence/window state.
793                if session.has_subscription(&tx_id) {
794                    log::debug!(
795                        "Ignoring duplicate query subscribe client={} tx={} query_id={} item_type={}",
796                        session.client_id,
797                        tx_id,
798                        query_id,
799                        entity_type
800                    );
801                    return Ok(());
802                }
803                if let Ok(mut map) = query_ids_by_tx.lock() {
804                    map.insert(tx_id.clone(), query_id.clone());
805                }
806                if let Ok(mut map) = subscribe_started_by_tx.lock() {
807                    map.entry(tx_id.clone()).or_insert_with(Instant::now);
808                }
809
810                log::trace!("Query {} for {} (tx: {})", query_id, entity_type, tx_id);
811                log::trace!(
812                    "Query subscribe request client={} tx={} query_id={} item_type={} window={} active_subscriptions_before={}",
813                    session.client_id,
814                    tx_id,
815                    query_id,
816                    entity_type,
817                    wrapped.window.is_some(),
818                    session.subscription_count()
819                );
820
821                let request_context = Arc::new(RequestContext::from_client(
822                    tx_id.clone(),
823                    session.client_id.clone(),
824                    host_id,
825                ));
826
827                if let Some(query_data) = handler_registry.get_query(query_id) {
828                    let parsed = (query_data.parse)(wrapped.query.clone());
829                    match parsed {
830                        Ok(any_query) => {
831                            // NOTE(ts): Spawn cell_factory on blocking pool so it doesn't
832                            // block command processing or other messages.
833                            let cell_factory = query_data.cell_factory;
834                            let registry = registry.clone();
835                            let request_context = request_context.clone();
836                            let ctx = ctx.clone();
837                            let window = wrapped.window.clone();
838                            let query_id = query_id.clone();
839                            let sub_tx = subscribe_tx.clone();
840                            tokio::task::spawn_blocking(move || {
841                                match cell_factory(any_query, registry, request_context, Some(ctx))
842                                {
843                                    Ok(filtered_cellmap) => {
844                                        let _ = sub_tx.send(SubscriptionReady::Query {
845                                            tx_id,
846                                            cellmap: filtered_cellmap,
847                                            window,
848                                        });
849                                    }
850                                    Err(e) => {
851                                        log::error!(
852                                            "Failed to create query cell for {}: {}",
853                                            query_id,
854                                            e
855                                        );
856                                    }
857                                }
858                            });
859                        }
860                        Err(e) => {
861                            log::error!(
862                                "Failed to parse query {}: {} | payload: {}",
863                                query_id,
864                                e,
865                                serde_json::to_string(&wrapped.query).unwrap_or_default()
866                            );
867                        }
868                    }
869                } else {
870                    // Fall back to select all for unknown queries
871                    log::warn!(
872                        "No registered query handler for {}, falling back to select all",
873                        query_id
874                    );
875                    let store: myko::store::EntityStore =
876                        (*registry.get_or_create(entity_type)).clone();
877                    let cellmap = hyphae::MapQuery::materialize(store.select(|_| true));
878                    session.subscribe_query(tx_id, cellmap, wrapped.window.clone());
879                }
880            }
881
882            MykoMessage::View(wrapped) => {
883                let tx_id: Arc<str> = wrapped
884                    .view
885                    .get("tx")
886                    .and_then(|v| v.as_str())
887                    .unwrap_or("unknown")
888                    .into();
889                let view_id = &wrapped.view_id;
890                let item_type = &wrapped.view_item_type;
891
892                // Defensive dedupe: ignore repeated subscribe for an already-active tx.
893                if session.has_subscription(&tx_id) {
894                    log::debug!(
895                        "Ignoring duplicate view subscribe client={} tx={} view_id={} item_type={}",
896                        session.client_id,
897                        tx_id,
898                        view_id,
899                        item_type
900                    );
901                    return Ok(());
902                }
903                if let Ok(mut map) = view_ids_by_tx.lock() {
904                    map.insert(tx_id.clone(), view_id.clone());
905                }
906                if let Ok(mut map) = subscribe_started_by_tx.lock() {
907                    map.entry(tx_id.clone()).or_insert_with(Instant::now);
908                }
909
910                log::trace!("View {} for {} (tx: {})", view_id, item_type, tx_id);
911                log::trace!(
912                    "View subscribe request client={} tx={} view_id={} item_type={} window={:?}",
913                    session.client_id,
914                    tx_id,
915                    view_id,
916                    item_type,
917                    wrapped.window
918                );
919
920                let request_context = Arc::new(RequestContext::from_client(
921                    tx_id.clone(),
922                    session.client_id.clone(),
923                    host_id,
924                ));
925
926                if let Some(view_data) = handler_registry.get_view(view_id) {
927                    let parsed = (view_data.parse)(wrapped.view.clone());
928                    match parsed {
929                        Ok(any_view) => {
930                            log::trace!(
931                                "View parsed successfully client={} tx={} view_id={}",
932                                session.client_id,
933                                tx_id,
934                                view_id
935                            );
936                            // NOTE(ts): Spawn cell_factory on blocking pool so it doesn't
937                            // block command processing or other messages.
938                            let cell_factory = view_data.cell_factory;
939                            let registry = registry.clone();
940                            let ctx = ctx.clone();
941                            let window = wrapped.window.clone();
942                            let view_id_clone = view_id.clone();
943                            let sub_tx = subscribe_tx.clone();
944                            let priority_tx = priority_tx.clone();
945                            let drop_logger = drop_logger.clone();
946                            tokio::task::spawn_blocking(move || {
947                                match cell_factory(any_view, registry, request_context, ctx) {
948                                    Ok(filtered_cellmap) => {
949                                        log::trace!(
950                                            "View cell factory succeeded tx={} view_id={}",
951                                            tx_id,
952                                            view_id_clone
953                                        );
954                                        let _ = sub_tx.send(SubscriptionReady::View {
955                                            tx_id,
956                                            view_id: view_id_clone,
957                                            cellmap: filtered_cellmap,
958                                            window,
959                                        });
960                                    }
961                                    Err(e) => {
962                                        log::error!(
963                                            "Failed to create view cell for {}: {}",
964                                            view_id_clone,
965                                            e
966                                        );
967                                        if let Err(err) = priority_tx.try_send(
968                                            MykoMessage::ViewError(ViewError {
969                                                tx: tx_id.to_string(),
970                                                view_id: view_id_clone.to_string(),
971                                                message: e,
972                                            }),
973                                        ) {
974                                            drop_logger.on_drop("ViewError", &err);
975                                        }
976                                    }
977                                }
978                            });
979                        }
980                        Err(e) => {
981                            let message = format!("Failed to parse view {}: {}", view_id, e);
982                            log::error!(
983                                "{} | payload: {}",
984                                message,
985                                serde_json::to_string(&wrapped.view).unwrap_or_default()
986                            );
987                            if let Err(err) =
988                                priority_tx.try_send(MykoMessage::ViewError(ViewError {
989                                    tx: tx_id.to_string(),
990                                    view_id: view_id.to_string(),
991                                    message,
992                                }))
993                            {
994                                drop_logger.on_drop("ViewError", &err);
995                            }
996                        }
997                    }
998                } else {
999                    let message = format!("No registered handler for view: {}", view_id);
1000                    log::warn!("{}", message);
1001                    if let Err(err) = priority_tx.try_send(MykoMessage::ViewError(ViewError {
1002                        tx: tx_id.to_string(),
1003                        view_id: view_id.to_string(),
1004                        message,
1005                    })) {
1006                        drop_logger.on_drop("ViewError", &err);
1007                    }
1008                }
1009            }
1010
1011            MykoMessage::QueryCancel(CancelSubscription { tx: tx_id }) => {
1012                log::trace!(
1013                    "QueryCancel received: client={} tx={}",
1014                    session.client_id,
1015                    tx_id
1016                );
1017                let tx_id: Arc<str> = tx_id.into();
1018                if let Ok(mut map) = query_ids_by_tx.lock() {
1019                    map.remove(&tx_id);
1020                }
1021                if let Ok(mut map) = subscribe_started_by_tx.lock() {
1022                    map.remove(&tx_id);
1023                }
1024                session.cancel(&tx_id);
1025            }
1026
1027            MykoMessage::QueryWindow(QueryWindowUpdate { tx, window }) => {
1028                let tx_id: Arc<str> = tx.into();
1029                log::trace!(
1030                    "Query window request client={} tx={} has_window={} active_subscriptions={}",
1031                    session.client_id,
1032                    tx_id,
1033                    window.is_some(),
1034                    session.subscription_count()
1035                );
1036                session.update_query_window(&tx_id, window);
1037            }
1038            MykoMessage::ViewCancel(CancelSubscription { tx: tx_id }) => {
1039                log::trace!("View cancel: {}", tx_id);
1040                let tx_id: Arc<str> = tx_id.into();
1041                if let Ok(mut map) = view_ids_by_tx.lock() {
1042                    map.remove(&tx_id);
1043                }
1044                if let Ok(mut map) = subscribe_started_by_tx.lock() {
1045                    map.remove(&tx_id);
1046                }
1047                session.cancel(&tx_id);
1048            }
1049            MykoMessage::ViewWindow(ViewWindowUpdate { tx, window }) => {
1050                let tx_id: Arc<str> = tx.into();
1051                log::trace!("View window update: {}", tx_id);
1052                session.update_view_window(&tx_id, window);
1053            }
1054
1055            MykoMessage::Report(wrapped) => {
1056                // Extract tx from the report JSON
1057                let tx_id: Arc<str> = wrapped
1058                    .report
1059                    .get("tx")
1060                    .and_then(|v| v.as_str())
1061                    .unwrap_or("unknown")
1062                    .into();
1063                let report_id = &wrapped.report_id;
1064
1065                log::trace!(
1066                    "Report subscribe request client={} tx={} report_id={} active_subscriptions_before={}",
1067                    session.client_id,
1068                    tx_id,
1069                    report_id,
1070                    session.subscription_count()
1071                );
1072
1073                // Look up the report registration
1074                if let Some(report_data) = handler_registry.get_report(report_id) {
1075                    // Parse the report JSON to the concrete type
1076                    let parsed = (report_data.parse)(wrapped.report.clone());
1077                    match parsed {
1078                        Ok(any_report) => {
1079                            let request_context = Arc::new(RequestContext::from_client(
1080                                tx_id.clone(),
1081                                session.client_id.clone(),
1082                                host_id,
1083                            ));
1084
1085                            // Create the cell using the factory (with host_id for context)
1086                            match (report_data.cell_factory)(any_report, request_context, ctx) {
1087                                Ok(cell) => {
1088                                    session.subscribe_report(
1089                                        tx_id,
1090                                        report_id.as_str().into(),
1091                                        cell,
1092                                    );
1093                                }
1094                                Err(e) => {
1095                                    log::error!(
1096                                        "Failed to create report cell for {}: {}",
1097                                        report_id,
1098                                        e
1099                                    );
1100                                }
1101                            }
1102                        }
1103                        Err(e) => {
1104                            log::error!(
1105                                "Failed to parse report {}: {} | payload: {}",
1106                                report_id,
1107                                e,
1108                                serde_json::to_string(&wrapped.report).unwrap_or_default()
1109                            );
1110                        }
1111                    }
1112                } else {
1113                    log::warn!("No registered handler for report: {}", report_id);
1114                }
1115            }
1116
1117            MykoMessage::ReportCancel(CancelSubscription { tx: tx_id }) => {
1118                log::trace!(
1119                    "ReportCancel received: client={} tx={} active_subscriptions_before={}",
1120                    session.client_id,
1121                    tx_id,
1122                    session.subscription_count()
1123                );
1124                session.cancel(&tx_id.into());
1125            }
1126
1127            MykoMessage::Event(mut event) => {
1128                event.sanitize_null_bytes();
1129                normalize_incoming_event(&mut event, &session.client_id, host_id);
1130                if let Err(e) = ctx.apply_event(event) {
1131                    log::error!(
1132                        "Failed to apply event from client {}: {e}",
1133                        session.client_id
1134                    );
1135                }
1136            }
1137
1138            MykoMessage::EventBatch(mut events) => {
1139                let incoming = events.len();
1140                if incoming >= 64 {
1141                    log::trace!(
1142                        "Received event batch from client {} size={}",
1143                        session.client_id,
1144                        incoming
1145                    );
1146                }
1147                for event in &mut events {
1148                    event.sanitize_null_bytes();
1149                    normalize_incoming_event(event, &session.client_id, host_id);
1150                }
1151                match ctx.apply_event_batch(events) {
1152                    Ok(applied) => {
1153                        log::trace!(
1154                            "Applied event batch from client {} size={}",
1155                            session.client_id,
1156                            applied
1157                        );
1158                    }
1159                    Err(e) => {
1160                        log::error!(
1161                            "Failed to apply event batch from client {}: {}",
1162                            session.client_id,
1163                            e
1164                        );
1165                    }
1166                }
1167            }
1168
1169            MykoMessage::Command(wrapped) => {
1170                // Extract tx from the command JSON
1171                let tx_id: Arc<str> = wrapped
1172                    .command
1173                    .get("tx")
1174                    .and_then(|v| v.as_str())
1175                    .unwrap_or("unknown")
1176                    .into();
1177
1178                let command_id = &wrapped.command_id;
1179
1180                log::trace!("Command {} (tx: {})", command_id, tx_id,);
1181                let received_at = Instant::now();
1182                if let Ok(mut map) = command_started_by_tx.lock() {
1183                    map.insert(tx_id.clone(), received_at);
1184                }
1185                if let Err(e) = command_tx.send(CommandJob {
1186                    tx_id: tx_id.clone(),
1187                    command_id: wrapped.command_id.clone(),
1188                    command: wrapped.command.clone(),
1189                    received_at,
1190                }) {
1191                    log::error!(
1192                        "Failed to enqueue command {} for client {} tx={}: {}",
1193                        command_id,
1194                        session.client_id,
1195                        tx_id,
1196                        e
1197                    );
1198                    let error = MykoMessage::CommandError(CommandError {
1199                        tx: tx_id.to_string(),
1200                        command_id: command_id.to_string(),
1201                        message: "Command queue unavailable".to_string(),
1202                    });
1203                    if let Err(err) = priority_tx.try_send(error) {
1204                        drop_logger.on_drop("CommandError", &err);
1205                    }
1206                }
1207            }
1208
1209            MykoMessage::Ping(PingData { id, timestamp }) => {
1210                // Echo back the ping data
1211                let pong = MykoMessage::Ping(PingData { id, timestamp });
1212                if let Err(e) = priority_tx.try_send(pong) {
1213                    drop_logger.on_drop("Ping", &e);
1214                }
1215            }
1216
1217            // Response messages - these shouldn't come from clients.
1218            MykoMessage::QueryResponse(resp) => {
1219                log::warn!(
1220                    "Unexpected client message kind=query_response client={} tx={} seq={} upserts={} deletes={} active_subscriptions={}",
1221                    session.client_id,
1222                    resp.tx,
1223                    resp.sequence,
1224                    resp.upserts.len(),
1225                    resp.deletes.len(),
1226                    session.subscription_count()
1227                );
1228            }
1229            MykoMessage::QueryError(err) => {
1230                log::warn!(
1231                    "Unexpected client message kind=query_error client={} tx={} query_id={} message={} active_subscriptions={}",
1232                    session.client_id,
1233                    err.tx,
1234                    err.query_id,
1235                    err.message,
1236                    session.subscription_count()
1237                );
1238            }
1239            MykoMessage::ViewResponse(resp) => {
1240                log::warn!(
1241                    "Unexpected client message kind=view_response client={} tx={} seq={} upserts={} deletes={} active_subscriptions={}",
1242                    session.client_id,
1243                    resp.tx,
1244                    resp.sequence,
1245                    resp.upserts.len(),
1246                    resp.deletes.len(),
1247                    session.subscription_count()
1248                );
1249            }
1250            MykoMessage::ViewError(err) => {
1251                log::warn!(
1252                    "Unexpected client message kind=view_error client={} tx={} view_id={} message={} active_subscriptions={}",
1253                    session.client_id,
1254                    err.tx,
1255                    err.view_id,
1256                    err.message,
1257                    session.subscription_count()
1258                );
1259            }
1260            MykoMessage::ReportResponse(resp) => {
1261                log::warn!(
1262                    "Unexpected client message kind=report_response client={} tx={} active_subscriptions={}",
1263                    session.client_id,
1264                    resp.tx,
1265                    session.subscription_count()
1266                );
1267            }
1268            MykoMessage::ReportError(err) => {
1269                log::warn!(
1270                    "Unexpected client message kind=report_error client={} tx={} report_id={} message={} active_subscriptions={}",
1271                    session.client_id,
1272                    err.tx,
1273                    err.report_id,
1274                    err.message,
1275                    session.subscription_count()
1276                );
1277            }
1278            MykoMessage::CommandResponse(resp) => {
1279                if resp.tx.trim().is_empty() {
1280                    log::warn!(
1281                        "Malformed client message kind=command_response client={} tx=<empty> active_subscriptions={}",
1282                        session.client_id,
1283                        session.subscription_count()
1284                    );
1285                } else {
1286                    let correlated = outbound_commands_by_tx
1287                        .lock()
1288                        .ok()
1289                        .and_then(|mut map| map.remove(&resp.tx));
1290                    if let Some((command_id, started)) = correlated {
1291                        // Success-path per-command roundtrip detail: fires once
1292                        // per correlated command response (ExecTargetAction etc.),
1293                        // tens of thousands per second under load. Keep at trace;
1294                        // the unmatched/error paths below stay at warn.
1295                        log::trace!(
1296                            "Client command response matched outbound command client={} tx={} command_id={} roundtrip_ms={} active_subscriptions={}",
1297                            session.client_id,
1298                            resp.tx,
1299                            command_id,
1300                            started.elapsed().as_millis(),
1301                            session.subscription_count()
1302                        );
1303                    } else {
1304                        log::warn!(
1305                            "Client command response without outbound match client={} tx={} active_subscriptions={}",
1306                            session.client_id,
1307                            resp.tx,
1308                            session.subscription_count()
1309                        );
1310                    }
1311                }
1312            }
1313            MykoMessage::CommandError(err) => {
1314                if err.tx.trim().is_empty() {
1315                    log::warn!(
1316                        "Malformed client message kind=command_error client={} tx=<empty> command_id={} message={} active_subscriptions={}",
1317                        session.client_id,
1318                        err.command_id,
1319                        err.message,
1320                        session.subscription_count()
1321                    );
1322                } else {
1323                    let correlated = outbound_commands_by_tx
1324                        .lock()
1325                        .ok()
1326                        .and_then(|mut map| map.remove(&err.tx));
1327                    if let Some((command_id, started)) = correlated {
1328                        log::warn!(
1329                            "Client command error matched outbound command client={} tx={} command_id={} transport_command_id={} message={} roundtrip_ms={} active_subscriptions={}",
1330                            session.client_id,
1331                            err.tx,
1332                            err.command_id,
1333                            command_id,
1334                            err.message,
1335                            started.elapsed().as_millis(),
1336                            session.subscription_count()
1337                        );
1338                    } else {
1339                        log::warn!(
1340                            "Client command error without outbound match client={} tx={} command_id={} message={} active_subscriptions={}",
1341                            session.client_id,
1342                            err.tx,
1343                            err.command_id,
1344                            err.message,
1345                            session.subscription_count()
1346                        );
1347                    }
1348                }
1349            }
1350            MykoMessage::Benchmark(payload) => {
1351                let stats = ws_benchmark_stats();
1352                ensure_ws_benchmark_logger();
1353                stats.message_count.fetch_add(1, Ordering::Relaxed);
1354                // Estimate payload size from the JSON value
1355                let size = payload.to_string().len() as u64;
1356                stats.total_bytes.fetch_add(size, Ordering::Relaxed);
1357            }
1358        }
1359
1360        Ok(())
1361    }
1362
1363    fn execute_command_job(
1364        ctx: Arc<CellServerCtx>,
1365        priority_tx: &mpsc::Sender<MykoMessage>,
1366        drop_logger: &DropLogger,
1367        client_id: Arc<str>,
1368        job: CommandJob,
1369    ) {
1370        let host_id = ctx.host_id;
1371        let started = Instant::now();
1372        let queue_wait_ms = started.duration_since(job.received_at).as_millis();
1373        let command_id = job.command_id.clone();
1374
1375        let mut handler_found = false;
1376        for registration in inventory::iter::<CommandHandlerRegistration> {
1377            if registration.command_id == command_id {
1378                handler_found = true;
1379                let executor = (registration.factory)();
1380
1381                let req = Arc::new(RequestContext::from_client(
1382                    job.tx_id.clone(),
1383                    client_id.clone(),
1384                    host_id,
1385                ));
1386                let cmd_id: Arc<str> = Arc::from(command_id.clone());
1387                let cmd_ctx = CommandContext::new(cmd_id, req, ctx.clone());
1388                let execute_started = Instant::now();
1389
1390                match executor.execute_from_value(job.command.clone(), cmd_ctx) {
1391                    Ok(result) => {
1392                        let response = MykoMessage::CommandResponse(CommandResponse {
1393                            response: result,
1394                            tx: job.tx_id.to_string(),
1395                        });
1396                        if let Err(e) = priority_tx.try_send(response) {
1397                            drop_logger.on_drop("CommandResponse", &e);
1398                        }
1399                    }
1400                    Err(e) => {
1401                        let error = MykoMessage::CommandError(CommandError {
1402                            tx: job.tx_id.to_string(),
1403                            command_id: command_id.clone(),
1404                            message: e.message,
1405                        });
1406                        if let Err(err) = priority_tx.try_send(error) {
1407                            drop_logger.on_drop("CommandError", &err);
1408                        }
1409                    }
1410                }
1411                let execute_ms = execute_started.elapsed().as_millis();
1412                let total_ms = job.received_at.elapsed().as_millis();
1413                log::trace!(
1414                    target: "myko_server::ws_perf",
1415                    "command_exec client={} tx={} command_id={} queue_wait_ms={} execute_ms={} total_ms={}",
1416                    client_id,
1417                    job.tx_id,
1418                    command_id,
1419                    queue_wait_ms,
1420                    execute_ms,
1421                    total_ms
1422                );
1423                break;
1424            }
1425        }
1426
1427        if !handler_found {
1428            log::warn!("No registered handler for command: {}", command_id);
1429            let error = MykoMessage::CommandError(CommandError {
1430                tx: job.tx_id.to_string(),
1431                command_id: command_id.clone(),
1432                message: format!("Command handler not found: {}", command_id),
1433            });
1434            if let Err(e) = priority_tx.try_send(error) {
1435                drop_logger.on_drop("CommandError", &e);
1436            }
1437        }
1438
1439        if !handler_found {
1440            log::debug!(
1441                target: "myko_server::ws_perf",
1442                "command_exec client={} tx={} command_id={} queue_wait_ms={} execute_ms=0 total_ms={} handler_found=false",
1443                client_id,
1444                job.tx_id,
1445                command_id,
1446                queue_wait_ms,
1447                job.received_at.elapsed().as_millis()
1448            );
1449        }
1450    }
1451}
1452
1453/// Channel-based WebSocket writer.
1454///
1455/// Sends messages through an mpsc channel which are then
1456/// forwarded to the actual WebSocket.
1457struct ChannelWriter {
1458    tx: mpsc::Sender<OutboundMessage>,
1459    deferred_tx: mpsc::Sender<DeferredOutbound>,
1460    drop_logger: Arc<DropLogger>,
1461    outgoing_format: Arc<AtomicU8>,
1462}
1463
1464impl ChannelWriter {
1465    /// Cheap "is the writer task gone?" check used to short-circuit
1466    /// subscriber callbacks for a disconnected client. `Sender::is_closed`
1467    /// returns true once the matching receiver is dropped, which happens as
1468    /// soon as the write task exits. Avoiding the work here prevents the
1469    /// "buffer full / channel closed" log storm we used to see for 10+
1470    /// seconds after every disconnect while the session was still tearing
1471    /// down its subscription guards.
1472    #[inline]
1473    fn tx_dead(&self) -> bool {
1474        self.tx.is_closed()
1475    }
1476
1477    #[inline]
1478    fn deferred_dead(&self) -> bool {
1479        self.deferred_tx.is_closed()
1480    }
1481}
1482
1483impl WsWriter for ChannelWriter {
1484    fn send(&self, msg: MykoMessage) {
1485        // Fast path: writer is gone. Don't try to send and don't log; the
1486        // dead-channel state is expected after the client disconnects, and
1487        // a dropped subscription will follow shortly when the session
1488        // teardown finishes. Avoiding the log+counter prevents the
1489        // "buffer full / channel closed" warning storm we used to see for
1490        // 10+ seconds after every disconnect.
1491        if self.tx_dead() {
1492            return;
1493        }
1494        if let Err(e) = self.tx.try_send(OutboundMessage::Message(msg)) {
1495            // Closed errors here race with the writer task exiting between
1496            // the is_dead check and the try_send; suppress them too.
1497            if !matches!(e, mpsc::error::TrySendError::Closed(_)) {
1498                self.drop_logger.on_drop("message", &e);
1499            }
1500        }
1501    }
1502
1503    fn protocol(&self) -> MykoProtocol {
1504        MykoProtocol::from(self.outgoing_format.load(Ordering::SeqCst))
1505    }
1506
1507    fn send_serialized_command(
1508        &self,
1509        tx: Arc<str>,
1510        command_id: String,
1511        payload: EncodedCommandMessage,
1512    ) {
1513        if self.tx_dead() {
1514            return;
1515        }
1516        if let Err(e) = self.tx.try_send(OutboundMessage::SerializedCommand {
1517            tx,
1518            command_id,
1519            payload,
1520        }) && !matches!(e, mpsc::error::TrySendError::Closed(_))
1521        {
1522            self.drop_logger.on_drop("serialized_command", &e);
1523        }
1524    }
1525
1526    fn send_report_response(&self, tx: Arc<str>, output: Arc<dyn AnyOutput>) {
1527        if self.deferred_dead() {
1528            return;
1529        }
1530        if let Err(e) = self
1531            .deferred_tx
1532            .try_send(DeferredOutbound::Report(tx, output))
1533            && !matches!(e, mpsc::error::TrySendError::Closed(_))
1534        {
1535            self.drop_logger.on_drop("ReportResponseDeferred", &e);
1536        }
1537    }
1538
1539    fn send_query_response(&self, response: PendingQueryResponse, is_view: bool) {
1540        if self.deferred_dead() {
1541            return;
1542        }
1543        if let Err(e) = self
1544            .deferred_tx
1545            .try_send(DeferredOutbound::Query { response, is_view })
1546            && !matches!(e, mpsc::error::TrySendError::Closed(_))
1547        {
1548            self.drop_logger.on_drop("QueryResponseDeferred", &e);
1549        }
1550    }
1551}
1552
1553#[cfg(test)]
1554mod tests {
1555    use super::*;
1556
1557    #[test]
1558    fn test_channel_writer() {
1559        let (tx, mut rx) = mpsc::channel(10);
1560        let (deferred_tx, _deferred_rx) = mpsc::channel(10);
1561        let drop_logger = Arc::new(DropLogger::new("test-client".into()));
1562        let writer = ChannelWriter {
1563            tx,
1564            deferred_tx,
1565            drop_logger,
1566            outgoing_format: Arc::new(AtomicU8::new(MykoProtocol::JSON as u8)),
1567        };
1568
1569        let msg = MykoMessage::Ping(PingData {
1570            id: "test".to_string(),
1571            timestamp: 0,
1572        });
1573        writer.send(msg);
1574
1575        let received = rx.try_recv().unwrap();
1576        assert!(matches!(
1577            received,
1578            OutboundMessage::Message(MykoMessage::Ping(_))
1579        ));
1580    }
1581
1582    #[test]
1583    fn outgoing_format_starts_as_json_and_promotes_to_cbor() {
1584        use std::sync::atomic::{AtomicU8, Ordering};
1585
1586        let outgoing_format = AtomicU8::new(MykoProtocol::JSON as u8);
1587
1588        // Initially JSON.
1589        assert_eq!(
1590            MykoProtocol::from(outgoing_format.load(Ordering::SeqCst)),
1591            MykoProtocol::JSON,
1592        );
1593
1594        // Simulate receiving a binary frame: promote.
1595        outgoing_format.store(MykoProtocol::CBOR as u8, Ordering::SeqCst);
1596        assert_eq!(
1597            MykoProtocol::from(outgoing_format.load(Ordering::SeqCst)),
1598            MykoProtocol::CBOR,
1599        );
1600
1601        // Simulate receiving more text frames after promotion: no change.
1602        // (The handler in the read loop only writes on Binary, never on Text,
1603        // so this is a no-op assertion that the field's last-write-wins
1604        // semantics give us stickiness for free.)
1605        assert_eq!(
1606            MykoProtocol::from(outgoing_format.load(Ordering::SeqCst)),
1607            MykoProtocol::CBOR,
1608        );
1609    }
1610}