Skip to main content

myko_server/
ws_handler.rs

1//! WebSocket handler for the cell-based server.
2//!
3//! Handles WebSocket connections using ClientSession for subscription management.
4
5use std::{
6    collections::HashMap,
7    net::SocketAddr,
8    sync::{
9        Arc, Mutex, OnceLock,
10        atomic::{AtomicBool, AtomicU8, AtomicU64, Ordering},
11    },
12    thread,
13    time::{Duration, Instant},
14};
15
16use dashmap::DashMap;
17use futures_util::{SinkExt, StreamExt};
18use hyphae::SelectExt;
19use myko::{
20    WS_MAX_FRAME_SIZE_BYTES, WS_MAX_MESSAGE_SIZE_BYTES,
21    client::MykoProtocol,
22    command::{CommandContext, CommandHandlerRegistration},
23    entities::client::{Client, ClientId},
24    relationship::{
25        iter_client_id_registrations, iter_fallback_to_id_registrations,
26        iter_server_owned_registrations,
27    },
28    report::AnyOutput,
29    request::RequestContext,
30    server::{
31        CellServerCtx, ClientSession, PendingQueryResponse, WsWriter,
32        client_registry::try_client_registry,
33    },
34    wire::{
35        CancelSubscription, CommandError, CommandResponse, EncodedCommandMessage, MEvent,
36        MEventType, MykoMessage, PingData, QueryWindowUpdate, ViewError, ViewWindowUpdate,
37    },
38};
39use tokio::{net::TcpStream, sync::mpsc, time::interval};
40use tokio_tungstenite::{
41    accept_async_with_config,
42    tungstenite::{Message, protocol::WebSocketConfig},
43};
44use uuid::Uuid;
45
46struct WsIngestStats {
47    counts_by_type: DashMap<Arc<str>, Arc<AtomicU64>>,
48}
49
50struct WsBenchmarkStats {
51    message_count: AtomicU64,
52    total_bytes: AtomicU64,
53}
54
55static WS_INGEST_STATS: OnceLock<Arc<WsIngestStats>> = OnceLock::new();
56static WS_INGEST_LOGGER_STARTED: AtomicBool = AtomicBool::new(false);
57static WS_BENCHMARK_STATS: OnceLock<Arc<WsBenchmarkStats>> = OnceLock::new();
58static WS_BENCHMARK_LOGGER_STARTED: AtomicBool = AtomicBool::new(false);
59
60fn ws_benchmark_stats() -> Arc<WsBenchmarkStats> {
61    WS_BENCHMARK_STATS
62        .get_or_init(|| {
63            Arc::new(WsBenchmarkStats {
64                message_count: AtomicU64::new(0),
65                total_bytes: AtomicU64::new(0),
66            })
67        })
68        .clone()
69}
70
71fn ensure_ws_benchmark_logger() {
72    if WS_BENCHMARK_LOGGER_STARTED
73        .compare_exchange(false, true, Ordering::Relaxed, Ordering::Relaxed)
74        .is_err()
75    {
76        return;
77    }
78
79    let stats = ws_benchmark_stats();
80    thread::Builder::new()
81        .name("ws-benchmark-logger".into())
82        .spawn(move || {
83            loop {
84                thread::sleep(Duration::from_secs(1));
85
86                let count = stats.message_count.swap(0, Ordering::Relaxed);
87                let bytes = stats.total_bytes.swap(0, Ordering::Relaxed);
88
89                if count == 0 {
90                    continue;
91                }
92
93                log::info!(
94                    "WebSocket benchmark last_1s messages={} bytes={} avg_bytes={}",
95                    count,
96                    bytes,
97                    bytes / count
98                );
99            }
100        })
101        .expect("failed to spawn websocket benchmark logger thread");
102}
103
104fn ws_ingest_stats() -> Arc<WsIngestStats> {
105    WS_INGEST_STATS
106        .get_or_init(|| {
107            Arc::new(WsIngestStats {
108                counts_by_type: DashMap::new(),
109            })
110        })
111        .clone()
112}
113
114fn ensure_ws_ingest_logger() {
115    if WS_INGEST_LOGGER_STARTED
116        .compare_exchange(false, true, Ordering::Relaxed, Ordering::Relaxed)
117        .is_err()
118    {
119        return;
120    }
121
122    let stats = ws_ingest_stats();
123    thread::Builder::new()
124        .name("ws-ingest-logger".into())
125        .spawn(move || {
126            loop {
127                thread::sleep(Duration::from_secs(1));
128
129                let mut per_type = Vec::new();
130                let mut total = 0u64;
131                for entry in &stats.counts_by_type {
132                    let count = entry.value().swap(0, Ordering::Relaxed);
133                    if count == 0 {
134                        continue;
135                    }
136                    total += count;
137                    per_type.push((entry.key().clone(), count));
138                }
139
140                if total == 0 {
141                    continue;
142                }
143
144                per_type.sort_by(|a, b| b.1.cmp(&a.1).then_with(|| a.0.cmp(&b.0)));
145                let details = per_type
146                    .into_iter()
147                    .map(|(entity_type, count)| format!("{entity_type}={count}"))
148                    .collect::<Vec<_>>()
149                    .join(" ");
150
151                log::info!("WebSocket ingest last_1s total={} {}", total, details);
152            }
153        })
154        .expect("failed to spawn websocket ingest logger thread");
155}
156
157fn record_ws_ingest(events: &[MEvent]) {
158    if events.is_empty() {
159        return;
160    }
161
162    let stats = ws_ingest_stats();
163    for event in events {
164        let counter = stats
165            .counts_by_type
166            .entry(Arc::<str>::from(event.item_type.as_str()))
167            .or_insert_with(|| Arc::new(AtomicU64::new(0)))
168            .clone();
169        counter.fetch_add(1, Ordering::Relaxed);
170    }
171}
172
173fn normalize_incoming_event(event: &mut MEvent, client_id: &str, host_id: uuid::Uuid) {
174    if event.change_type != MEventType::SET {
175        return;
176    }
177
178    // Auto-populate #[myko_client_id] fields with the connection's client_id
179    for reg in iter_client_id_registrations() {
180        if reg.entity_type == event.item_type {
181            if let Some(obj) = event.item.as_object_mut() {
182                obj.insert(
183                    reg.field_name_json.to_string(),
184                    serde_json::Value::String(client_id.to_string()),
185                );
186            }
187            break;
188        }
189    }
190
191    // Auto-populate #[server_owned] fields with this server's ID
192    for reg in iter_server_owned_registrations() {
193        if reg.entity_type == event.item_type {
194            if let Some(obj) = event.item.as_object_mut() {
195                let field = reg.field_name_json;
196                let current = obj.get(field).and_then(|v| v.as_str()).unwrap_or("");
197                if current.is_empty() {
198                    obj.insert(
199                        field.to_string(),
200                        serde_json::Value::String(host_id.to_string()),
201                    );
202                }
203            }
204            break;
205        }
206    }
207
208    // Auto-populate #[fallback_to_id] fields with the entity's own id
209    // if the field is null or missing.
210    if let Some(obj) = event.item.as_object_mut()
211        && let Some(id) = obj.get("id").and_then(|v| v.as_str()).map(String::from)
212    {
213        for reg in iter_fallback_to_id_registrations() {
214            if reg.entity_type == event.item_type {
215                let field = reg.field_name_json;
216                if matches!(obj.get(field), None | Some(serde_json::Value::Null)) {
217                    obj.insert(field.to_string(), serde_json::Value::String(id.clone()));
218                }
219            }
220        }
221    }
222}
223
224/// Per-connection drop tracking to avoid log storms when clients fall behind.
225///
226/// When the outbound channel is full, we will drop messages (same as today),
227/// but we must not `warn!` for every drop or we can effectively DoS ourselves.
228struct DropLogger {
229    client_id: Arc<str>,
230    dropped: std::sync::atomic::AtomicU64,
231    last_log_ms: std::sync::atomic::AtomicU64,
232}
233
234impl DropLogger {
235    fn new(client_id: Arc<str>) -> Self {
236        Self {
237            client_id,
238            dropped: std::sync::atomic::AtomicU64::new(0),
239            last_log_ms: std::sync::atomic::AtomicU64::new(0),
240        }
241    }
242
243    fn on_drop(&self, kind: &'static str, err: &dyn std::fmt::Display) {
244        use std::sync::atomic::Ordering;
245
246        self.dropped.fetch_add(1, Ordering::Relaxed);
247
248        // Log at most once per second per connection.
249        let now_ms = std::time::SystemTime::now()
250            .duration_since(std::time::UNIX_EPOCH)
251            .unwrap_or_default()
252            .as_millis() as u64;
253        let last_ms = self.last_log_ms.load(Ordering::Relaxed);
254        if now_ms.saturating_sub(last_ms) < 1000 {
255            return;
256        }
257
258        if self
259            .last_log_ms
260            .compare_exchange(last_ms, now_ms, Ordering::Relaxed, Ordering::Relaxed)
261            .is_err()
262        {
263            return;
264        }
265
266        let n = self.dropped.swap(0, Ordering::Relaxed);
267        log::warn!(
268            "WebSocket send buffer full; dropped {} message(s) for client {} (latest: {}): {}",
269            n,
270            self.client_id,
271            kind,
272            err
273        );
274    }
275}
276
277struct CommandJob {
278    tx_id: Arc<str>,
279    command_id: String,
280    command: serde_json::Value,
281    received_at: Instant,
282}
283
284/// Result of an async subscription build (query or view cell_factory).
285enum SubscriptionReady {
286    Query {
287        tx_id: Arc<str>,
288        cellmap: hyphae::CellMap<Arc<str>, Arc<dyn myko::item::AnyItem>, hyphae::CellImmutable>,
289        window: Option<myko::wire::QueryWindow>,
290    },
291    View {
292        tx_id: Arc<str>,
293        view_id: Arc<str>,
294        cellmap: hyphae::CellMap<Arc<str>, Arc<dyn myko::item::AnyItem>, hyphae::CellImmutable>,
295        window: Option<myko::wire::QueryWindow>,
296    },
297}
298
299enum OutboundMessage {
300    Message(MykoMessage),
301    SerializedCommand {
302        tx: Arc<str>,
303        command_id: String,
304        payload: EncodedCommandMessage,
305    },
306}
307
308enum DeferredOutbound {
309    Report(Arc<str>, Arc<dyn AnyOutput>),
310    Query {
311        response: PendingQueryResponse,
312        is_view: bool,
313    },
314}
315
316/// WebSocket handler for a single client connection.
317pub struct WsHandler;
318
319impl WsHandler {
320    /// Handle a new WebSocket connection (performs the handshake).
321    pub async fn handle_connection(
322        stream: TcpStream,
323        addr: SocketAddr,
324        ctx: Arc<CellServerCtx>,
325    ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
326        let mut ws_config = WebSocketConfig::default();
327        ws_config.max_message_size = Some(WS_MAX_MESSAGE_SIZE_BYTES);
328        ws_config.max_frame_size = Some(WS_MAX_FRAME_SIZE_BYTES);
329        let ws_stream = accept_async_with_config(stream, Some(ws_config)).await?;
330        Self::handle_upgraded(ws_stream, addr, ctx).await
331    }
332
333    /// Handle a WebSocket connection whose HTTP/1.1 handshake has already
334    /// completed and produced a [`tokio_tungstenite::WebSocketStream`].
335    ///
336    /// Used by the front-door router when it pre-parses the HTTP request
337    /// (to dispatch between `/myko` WS and `/myko/mcp` HTTP/WS) and then
338    /// completes the WS handshake itself.
339    #[allow(clippy::too_many_arguments)]
340    pub async fn handle_upgraded(
341        ws_stream: tokio_tungstenite::WebSocketStream<TcpStream>,
342        addr: SocketAddr,
343        ctx: Arc<CellServerCtx>,
344    ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
345        ensure_ws_ingest_logger();
346
347        let host_id = ctx.host_id;
348
349        let (mut write, mut read) = ws_stream.split();
350
351        // Create a bounded channel for sending messages to the client
352        // High limit (10k) since we have good memory availability
353        let (tx, mut rx) = mpsc::channel::<OutboundMessage>(10_000);
354        let (deferred_tx, mut deferred_rx) = mpsc::channel::<DeferredOutbound>(10_000);
355        let (priority_tx, mut priority_rx) = mpsc::channel::<MykoMessage>(1_000);
356        let (command_tx, mut command_rx) = mpsc::unbounded_channel::<CommandJob>();
357        let (subscribe_tx, mut subscribe_rx) = mpsc::unbounded_channel::<SubscriptionReady>();
358
359        // Outgoing format for this session: defaults to JSON, sticky-promotes
360        // to CBOR on the first received binary frame. Never demotes.
361        let outgoing_format = Arc::new(AtomicU8::new(MykoProtocol::JSON as u8));
362
363        // Create client session with channel-based writer
364        let client_id: Arc<str> = Uuid::new_v4().to_string().into();
365        let drop_logger = Arc::new(DropLogger::new(client_id.clone()));
366        let writer = ChannelWriter {
367            tx: tx.clone(),
368            deferred_tx: deferred_tx.clone(),
369            drop_logger: drop_logger.clone(),
370            outgoing_format: outgoing_format.clone(),
371        };
372
373        // Register writer in the global client registry (if initialized)
374        let writer_arc: Arc<dyn WsWriter> = Arc::new(ChannelWriter {
375            tx: tx.clone(),
376            deferred_tx: deferred_tx.clone(),
377            drop_logger: drop_logger.clone(),
378            outgoing_format: outgoing_format.clone(),
379        });
380        if let Some(registry) = try_client_registry() {
381            registry.register(client_id.clone(), writer_arc);
382        }
383
384        let mut session = ClientSession::new(client_id.clone(), writer);
385
386        let outgoing_format_writer = outgoing_format.clone();
387        let query_ids_by_tx: Arc<Mutex<HashMap<Arc<str>, Arc<str>>>> =
388            Arc::new(Mutex::new(HashMap::new()));
389        let view_ids_by_tx: Arc<Mutex<HashMap<Arc<str>, Arc<str>>>> =
390            Arc::new(Mutex::new(HashMap::new()));
391        let subscribe_started_by_tx: Arc<Mutex<HashMap<Arc<str>, Instant>>> =
392            Arc::new(Mutex::new(HashMap::new()));
393        let command_started_by_tx: Arc<Mutex<HashMap<Arc<str>, Instant>>> =
394            Arc::new(Mutex::new(HashMap::new()));
395        let outbound_commands_by_tx: Arc<Mutex<HashMap<String, (String, Instant)>>> =
396            Arc::new(Mutex::new(HashMap::new()));
397
398        let outbound_commands_by_tx_writer = outbound_commands_by_tx.clone();
399
400        // Publish Client entity
401        let client_entity = Client {
402            id: ClientId(client_id.clone()),
403            server_id: host_id.to_string().into(),
404            address: Some(Arc::from(addr.to_string())),
405            windback: None,
406        };
407        if let Err(e) = ctx.set(&client_entity) {
408            log::error!("Failed to persist client entity: {e}");
409        }
410
411        log::info!("Client connected: {} from {}", client_id, addr);
412
413        let write_ctx = ctx.clone();
414        let write_client_id = client_id.clone();
415        let write_addr = addr;
416        let command_ctx = ctx.clone();
417        let command_priority_tx = priority_tx.clone();
418        let command_drop_logger = drop_logger.clone();
419        let command_client_id = client_id.clone();
420
421        // Spawn task to forward messages from channel to WebSocket
422        let write_task = tokio::spawn(async move {
423            let _ctx = write_ctx;
424            let mut normal_open = true;
425            let mut priority_open = true;
426            let mut deferred_open = true;
427            while normal_open || priority_open || deferred_open {
428                let msg = tokio::select! {
429                    biased;
430                    maybe = priority_rx.recv(), if priority_open => {
431                        match maybe {
432                            Some(msg) => OutboundMessage::Message(msg),
433                            None => {
434                                priority_open = false;
435                                continue;
436                            }
437                        }
438                    }
439                    maybe = deferred_rx.recv(), if deferred_open => {
440                        match maybe {
441                            Some(DeferredOutbound::Report(tx, output)) => {
442                                OutboundMessage::Message(MykoMessage::ReportResponse(myko::wire::ReportResponse {
443                                    response: output.to_value(),
444                                    tx: tx.to_string(),
445                                }))
446                            }
447                            Some(DeferredOutbound::Query { response, is_view }) => {
448                                if is_view {
449                                    OutboundMessage::Message(MykoMessage::ViewResponse(response.into_wire()))
450                                } else {
451                                    OutboundMessage::Message(MykoMessage::QueryResponse(response.into_wire()))
452                                }
453                            }
454                            None => {
455                                deferred_open = false;
456                                continue;
457                            }
458                        }
459                    }
460                    maybe = rx.recv(), if normal_open => {
461                        match maybe {
462                            Some(msg) => msg,
463                            None => {
464                                normal_open = false;
465                                continue;
466                            }
467                        }
468                    }
469                };
470                // Per-message timing tap (outbound). Counts by kind into the
471                // global ws_timing instrumentation. Counterpart to the inbound
472                // tap in `handle_message`.
473                match &msg {
474                    OutboundMessage::SerializedCommand { .. } => {
475                        crate::ws_timing::record_outbound("Command")
476                    }
477                    OutboundMessage::Message(m) => {
478                        crate::ws_timing::record_outbound(crate::ws_timing::message_kind(m))
479                    }
480                };
481                let (kind, tx_id, seq, _upserts, _deletes, _total_count) = match &msg {
482                    OutboundMessage::SerializedCommand { tx, .. } => {
483                        ("command", Some(tx.clone()), None, None, None, None)
484                    }
485                    OutboundMessage::Message(msg) => match msg {
486                        MykoMessage::ViewResponse(r) => (
487                            "view_response",
488                            Some(r.tx.clone()),
489                            Some(r.sequence),
490                            Some(r.upserts.len()),
491                            Some(r.deletes.len()),
492                            r.total_count,
493                        ),
494                        MykoMessage::QueryResponse(r) => (
495                            "query_response",
496                            Some(r.tx.clone()),
497                            Some(r.sequence),
498                            Some(r.upserts.len()),
499                            Some(r.deletes.len()),
500                            r.total_count,
501                        ),
502                        MykoMessage::CommandResponse(r) => (
503                            "command_response",
504                            Some(Arc::<str>::from(r.tx.clone())),
505                            None,
506                            None,
507                            None,
508                            None,
509                        ),
510                        MykoMessage::CommandError(r) => (
511                            "command_error",
512                            Some(Arc::<str>::from(r.tx.clone())),
513                            None,
514                            None,
515                            None,
516                            None,
517                        ),
518                        _ => ("other", None, None, None, None, None),
519                    },
520                };
521
522                match &msg {
523                    OutboundMessage::SerializedCommand { tx, command_id, .. } => {
524                        if !tx.trim().is_empty()
525                            && let Ok(mut map) = outbound_commands_by_tx_writer.lock()
526                        {
527                            map.insert(tx.to_string(), (command_id.clone(), Instant::now()));
528                        }
529                    }
530                    OutboundMessage::Message(MykoMessage::Command(wrapped)) => {
531                        if let Some(tx_id) = wrapped.command.get("tx").and_then(|v| v.as_str())
532                            && !tx_id.trim().is_empty()
533                            && let Ok(mut map) = outbound_commands_by_tx_writer.lock()
534                        {
535                            map.insert(
536                                tx_id.to_string(),
537                                (wrapped.command_id.clone(), Instant::now()),
538                            );
539                        }
540                    }
541                    _ => {}
542                }
543
544                let ws_msg = match &msg {
545                    OutboundMessage::SerializedCommand {
546                        payload: EncodedCommandMessage::Json(json),
547                        ..
548                    } => Message::Text(json.clone().into()),
549                    OutboundMessage::SerializedCommand {
550                        payload: EncodedCommandMessage::Cbor(bytes),
551                        ..
552                    } => Message::Binary(bytes.clone().into()),
553                    OutboundMessage::Message(msg)
554                        if outgoing_format_writer.load(Ordering::SeqCst)
555                            == MykoProtocol::CBOR as u8 =>
556                    {
557                        let mut bytes = Vec::new();
558                        match ciborium::ser::into_writer(msg, &mut bytes) {
559                            Ok(()) => Message::Binary(bytes.into()),
560                            Err(e) => {
561                                log::error!("Failed to serialize message to CBOR: {}", e);
562                                continue;
563                            }
564                        }
565                    }
566                    OutboundMessage::Message(msg) => match serde_json::to_string(msg) {
567                        Ok(json) => Message::Text(json.into()),
568                        Err(e) => {
569                            log::error!("Failed to serialize message to JSON: {}", e);
570                            continue;
571                        }
572                    },
573                };
574                let payload_bytes = match &ws_msg {
575                    Message::Binary(b) => b.len(),
576                    Message::Text(t) => t.len(),
577                    _ => 0,
578                };
579
580                if let Err(err) = write.send(ws_msg).await {
581                    log::error!(
582                        "WebSocket write failed for client {} from {} kind={} tx={:?} seq={:?} payload_bytes={} binary={}: {}",
583                        write_client_id,
584                        write_addr,
585                        kind,
586                        tx_id,
587                        seq,
588                        payload_bytes,
589                        outgoing_format_writer.load(Ordering::SeqCst) == MykoProtocol::CBOR as u8,
590                        err
591                    );
592                    break;
593                }
594            }
595            // NOTE(ts): Unregister from client registry immediately so the node
596            // executor stops serializing commands into a dead channel.
597            if let Some(registry) = try_client_registry() {
598                registry.unregister(&write_client_id);
599                log::info!(
600                    "WebSocket writer unregistered client {} from {} (write task exiting)",
601                    write_client_id,
602                    write_addr,
603                );
604            }
605            log::warn!(
606                "WebSocket writer task exiting for client {} from {} normal_open={} priority_open={} deferred_open={}",
607                write_client_id,
608                write_addr,
609                normal_open,
610                priority_open,
611                deferred_open
612            );
613        });
614
615        // Execute commands on a dedicated worker so ping/cancel traffic is never
616        // blocked by long-running command handlers.
617        let command_started_cleanup = command_started_by_tx.clone();
618        let command_task = tokio::spawn(async move {
619            while let Some(job) = command_rx.recv().await {
620                let command_ctx = command_ctx.clone();
621                let command_priority_tx = command_priority_tx.clone();
622                let command_drop_logger = command_drop_logger.clone();
623                let command_client_id = command_client_id.clone();
624                let tx_id = job.tx_id.clone();
625                let started_map = command_started_cleanup.clone();
626                match tokio::task::spawn_blocking(move || {
627                    Self::execute_command_job(
628                        command_ctx,
629                        &command_priority_tx,
630                        command_drop_logger.as_ref(),
631                        command_client_id,
632                        job,
633                    );
634                })
635                .await
636                {
637                    Ok(()) => {}
638                    Err(e) => {
639                        log::error!("Command worker panicked: {}", e);
640                    }
641                }
642                // NOTE(ts): Clean up timing entry after command completes (success or panic).
643                if let Ok(mut map) = started_map.lock() {
644                    map.remove(&tx_id);
645                }
646            }
647        });
648
649        // Process incoming messages and completed subscription builds concurrently.
650        // NOTE(ts): View/query cell_factory calls are spawned on the blocking thread pool
651        // so they don't block command processing or other messages.
652        let mut outbound_ttl_interval = interval(Duration::from_secs(10));
653        outbound_ttl_interval.tick().await; // NOTE(ts): consume the immediate first tick
654        loop {
655            tokio::select! {
656                // Completed subscription builds — register with session
657                Some(ready) = subscribe_rx.recv() => {
658                    let tx_id = match &ready {
659                        SubscriptionReady::Query { tx_id, .. } => tx_id.clone(),
660                        SubscriptionReady::View { tx_id, .. } => tx_id.clone(),
661                    };
662                    if let Ok(mut map) = subscribe_started_by_tx.lock() {
663                        map.remove(&tx_id);
664                    }
665                    match ready {
666                        SubscriptionReady::Query { tx_id, cellmap, window } => {
667                            session.subscribe_query(tx_id, cellmap, window);
668                        }
669                        SubscriptionReady::View { tx_id, view_id, cellmap, window } => {
670                            session.subscribe_view_with_id(tx_id, view_id, cellmap, window);
671                        }
672                    }
673                }
674                // NOTE(ts): Sweep outbound command entries older than 10s.
675                // Responses normally arrive quickly; stale entries are from
676                // dropped connections or commands that will never get a response.
677                _ = outbound_ttl_interval.tick() => {
678                    if let Ok(mut map) = outbound_commands_by_tx.lock() {
679                        let before = map.len();
680                        map.retain(|_, (_, started)| started.elapsed() < Duration::from_secs(10));
681                        let removed = before - map.len();
682                        if removed > 0 {
683                            log::debug!(
684                                "Outbound command TTL sweep client={}: removed {} stale entries, {} remaining",
685                                session.client_id,
686                                removed,
687                                map.len()
688                            );
689                        }
690                    }
691                }
692                // Incoming WebSocket messages
693                msg = read.next() => {
694                    let Some(msg) = msg else { break };
695                    let ctx = ctx.clone();
696                    let msg = match msg {
697                        Ok(m) => m,
698                        Err(e) => {
699                            log::error!("WebSocket read error from {}: {}", client_id, e);
700                            break;
701                        }
702                    };
703
704                    match msg {
705                        Message::Binary(data) => {
706                            if outgoing_format.load(Ordering::SeqCst) != MykoProtocol::CBOR as u8 {
707                                log::debug!(
708                                    "Client {} promoted outgoing format to CBOR via demonstration",
709                                    client_id
710                                );
711                                outgoing_format.store(MykoProtocol::CBOR as u8, Ordering::SeqCst);
712                            }
713
714                            match ciborium::de::from_reader::<MykoMessage, _>(data.as_ref()) {
715                                Ok(myko_msg) => {
716                                    if let Err(e) = Self::handle_message(
717                                        &mut session,
718                                        ctx,
719                                        &priority_tx,
720                                        &drop_logger,
721                                        &query_ids_by_tx,
722                                        &view_ids_by_tx,
723                                        &subscribe_started_by_tx,
724                                        &command_started_by_tx,
725                                        &outbound_commands_by_tx,
726                                        &command_tx,
727                                        &subscribe_tx,
728                                        myko_msg,
729                                    ) {
730                                        log::error!("Error handling message: {}", e);
731                                    }
732                                    tokio::task::yield_now().await;
733                                }
734                                Err(e) => {
735                                    log::warn!("Failed to parse message from {}: {}", client_id, e);
736                                }
737                            }
738                        }
739                        Message::Text(text) => {
740                            match serde_json::from_str::<MykoMessage>(&text) {
741                                Ok(myko_msg) => {
742                                    if let Err(e) = Self::handle_message(
743                                        &mut session,
744                                        ctx,
745                                        &priority_tx,
746                                        &drop_logger,
747                                        &query_ids_by_tx,
748                                        &view_ids_by_tx,
749                                        &subscribe_started_by_tx,
750                                        &command_started_by_tx,
751                                        &outbound_commands_by_tx,
752                                        &command_tx,
753                                        &subscribe_tx,
754                                        myko_msg,
755                                    ) {
756                                        log::error!("Error handling message: {}", e);
757                                    }
758                                    tokio::task::yield_now().await;
759                                }
760                                Err(e) => {
761                                    log::warn!(
762                                        "Failed to parse JSON message from {}: {} | raw: {}",
763                                        client_id,
764                                        e,
765                                        if text.len() > 1000 {
766                                            &text[..1000]
767                                        } else {
768                                            &text
769                                        }
770                                    );
771                                }
772                            }
773                        }
774                        Message::Ping(data) => {
775                            log::trace!("Ping from {}", client_id);
776                            let _ = data;
777                        }
778                        Message::Pong(_) => {
779                            log::trace!("Pong from {}", client_id);
780                        }
781                        Message::Close(frame) => {
782                            log::warn!("Client {} sent close frame: {:?}", client_id, frame);
783                            break;
784                        }
785                        Message::Frame(_) => {}
786                    }
787                }
788            }
789        }
790
791        // Cleanup. Order matters:
792        // 1. Abort the write/command tasks first so their channel receivers
793        //    are dropped — `ChannelWriter` clones held by subscriber
794        //    callbacks then short-circuit via `tx_dead()` / `deferred_dead()`
795        //    instead of doing useless work and emitting log spam while we
796        //    tear down the session.
797        // 2. Unregister the client writer from the global registry so other
798        //    parts of the system stop dispatching commands here.
799        // 3. Move the session drop to a blocking thread. `drop(session)`
800        //    cancels each subscription guard, and hyphae's per-cell
801        //    unsubscribe is O(N) in the cell's subscriber list (Vec clone +
802        //    filter + ArcSwap store), so a session with thousands of
803        //    subscriptions can take significant CPU time. Doing it on the
804        //    async runtime would block other connections' tasks; the
805        //    blocking pool is the right place for this.
806        write_task.abort();
807        command_task.abort();
808        if let Some(registry) = try_client_registry() {
809            registry.unregister(&client_id);
810        }
811
812        let drop_client_id = client_id.clone();
813        tokio::task::spawn_blocking(move || {
814            drop(session); // Drops all subscription guards
815            log::trace!(
816                "Client session subscriptions torn down for {}",
817                drop_client_id
818            );
819        });
820
821        // Delete Client entity
822        if let Err(e) = ctx.del(&client_entity) {
823            log::error!("Failed to delete client entity: {e}");
824        }
825
826        log::info!("Client disconnected: {} from {}", client_id, addr);
827
828        Ok(())
829    }
830
831    /// Handle a parsed MykoMessage.
832    #[allow(clippy::too_many_arguments)]
833    fn handle_message<W: WsWriter>(
834        session: &mut ClientSession<W>,
835        ctx: Arc<CellServerCtx>,
836        priority_tx: &mpsc::Sender<MykoMessage>,
837        drop_logger: &Arc<DropLogger>,
838        query_ids_by_tx: &Arc<Mutex<HashMap<Arc<str>, Arc<str>>>>,
839        view_ids_by_tx: &Arc<Mutex<HashMap<Arc<str>, Arc<str>>>>,
840        subscribe_started_by_tx: &Arc<Mutex<HashMap<Arc<str>, Instant>>>,
841        command_started_by_tx: &Arc<Mutex<HashMap<Arc<str>, Instant>>>,
842        outbound_commands_by_tx: &Arc<Mutex<HashMap<String, (String, Instant)>>>,
843        command_tx: &mpsc::UnboundedSender<CommandJob>,
844        subscribe_tx: &mpsc::UnboundedSender<SubscriptionReady>,
845        msg: MykoMessage,
846    ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
847        // Per-message timing tap. Counts inbound messages by kind into the
848        // global instrumentation. Periodic summarizer logs the deltas.
849        crate::ws_timing::record_inbound(crate::ws_timing::message_kind(&msg));
850
851        let handler_registry = ctx.handler_registry.clone();
852
853        let registry = ctx.registry.clone();
854
855        let host_id = ctx.host_id;
856
857        match msg {
858            MykoMessage::Query(wrapped) => {
859                // Extract tx from the query JSON
860                let tx_id: Arc<str> = wrapped
861                    .query
862                    .get("tx")
863                    .and_then(|v| v.as_str())
864                    .unwrap_or("unknown")
865                    .into();
866                let query_id = &wrapped.query_id;
867                let entity_type = &wrapped.query_item_type;
868
869                // Defensive dedupe: some clients can replay the same subscribe request.
870                // A duplicate for the same tx would reset server-side sequence/window state.
871                if session.has_subscription(&tx_id) {
872                    log::debug!(
873                        "Ignoring duplicate query subscribe client={} tx={} query_id={} item_type={}",
874                        session.client_id,
875                        tx_id,
876                        query_id,
877                        entity_type
878                    );
879                    return Ok(());
880                }
881                if let Ok(mut map) = query_ids_by_tx.lock() {
882                    map.insert(tx_id.clone(), query_id.clone());
883                }
884                if let Ok(mut map) = subscribe_started_by_tx.lock() {
885                    map.entry(tx_id.clone()).or_insert_with(Instant::now);
886                }
887
888                log::trace!("Query {} for {} (tx: {})", query_id, entity_type, tx_id);
889                log::trace!(
890                    "Query subscribe request client={} tx={} query_id={} item_type={} window={} active_subscriptions_before={}",
891                    session.client_id,
892                    tx_id,
893                    query_id,
894                    entity_type,
895                    wrapped.window.is_some(),
896                    session.subscription_count()
897                );
898
899                let request_context = Arc::new(RequestContext::from_client(
900                    tx_id.clone(),
901                    session.client_id.clone(),
902                    host_id,
903                ));
904
905                if let Some(query_data) = handler_registry.get_query(query_id) {
906                    let parsed = (query_data.parse)(wrapped.query.clone());
907                    match parsed {
908                        Ok(any_query) => {
909                            // NOTE(ts): Spawn cell_factory on blocking pool so it doesn't
910                            // block command processing or other messages.
911                            let cell_factory = query_data.cell_factory;
912                            let registry = registry.clone();
913                            let request_context = request_context.clone();
914                            let ctx = ctx.clone();
915                            let window = wrapped.window.clone();
916                            let query_id = query_id.clone();
917                            let sub_tx = subscribe_tx.clone();
918                            tokio::task::spawn_blocking(move || {
919                                match cell_factory(any_query, registry, request_context, Some(ctx))
920                                {
921                                    Ok(filtered_cellmap) => {
922                                        let _ = sub_tx.send(SubscriptionReady::Query {
923                                            tx_id,
924                                            cellmap: filtered_cellmap,
925                                            window,
926                                        });
927                                    }
928                                    Err(e) => {
929                                        log::error!(
930                                            "Failed to create query cell for {}: {}",
931                                            query_id,
932                                            e
933                                        );
934                                    }
935                                }
936                            });
937                        }
938                        Err(e) => {
939                            log::error!(
940                                "Failed to parse query {}: {} | payload: {}",
941                                query_id,
942                                e,
943                                serde_json::to_string(&wrapped.query).unwrap_or_default()
944                            );
945                        }
946                    }
947                } else {
948                    // Fall back to select all for unknown queries
949                    log::warn!(
950                        "No registered query handler for {}, falling back to select all",
951                        query_id
952                    );
953                    let store: myko::store::EntityStore =
954                        (*registry.get_or_create(entity_type)).clone();
955                    let cellmap = hyphae::MapQuery::materialize(store.select(|_| true));
956                    session.subscribe_query(tx_id, cellmap, wrapped.window.clone());
957                }
958            }
959
960            MykoMessage::View(wrapped) => {
961                let tx_id: Arc<str> = wrapped
962                    .view
963                    .get("tx")
964                    .and_then(|v| v.as_str())
965                    .unwrap_or("unknown")
966                    .into();
967                let view_id = &wrapped.view_id;
968                let item_type = &wrapped.view_item_type;
969
970                // Defensive dedupe: ignore repeated subscribe for an already-active tx.
971                if session.has_subscription(&tx_id) {
972                    log::debug!(
973                        "Ignoring duplicate view subscribe client={} tx={} view_id={} item_type={}",
974                        session.client_id,
975                        tx_id,
976                        view_id,
977                        item_type
978                    );
979                    return Ok(());
980                }
981                if let Ok(mut map) = view_ids_by_tx.lock() {
982                    map.insert(tx_id.clone(), view_id.clone());
983                }
984                if let Ok(mut map) = subscribe_started_by_tx.lock() {
985                    map.entry(tx_id.clone()).or_insert_with(Instant::now);
986                }
987
988                log::trace!("View {} for {} (tx: {})", view_id, item_type, tx_id);
989                log::trace!(
990                    "View subscribe request client={} tx={} view_id={} item_type={} window={:?}",
991                    session.client_id,
992                    tx_id,
993                    view_id,
994                    item_type,
995                    wrapped.window
996                );
997
998                let request_context = Arc::new(RequestContext::from_client(
999                    tx_id.clone(),
1000                    session.client_id.clone(),
1001                    host_id,
1002                ));
1003
1004                if let Some(view_data) = handler_registry.get_view(view_id) {
1005                    let parsed = (view_data.parse)(wrapped.view.clone());
1006                    match parsed {
1007                        Ok(any_view) => {
1008                            log::trace!(
1009                                "View parsed successfully client={} tx={} view_id={}",
1010                                session.client_id,
1011                                tx_id,
1012                                view_id
1013                            );
1014                            // NOTE(ts): Spawn cell_factory on blocking pool so it doesn't
1015                            // block command processing or other messages.
1016                            let cell_factory = view_data.cell_factory;
1017                            let registry = registry.clone();
1018                            let ctx = ctx.clone();
1019                            let window = wrapped.window.clone();
1020                            let view_id_clone = view_id.clone();
1021                            let sub_tx = subscribe_tx.clone();
1022                            let priority_tx = priority_tx.clone();
1023                            let drop_logger = drop_logger.clone();
1024                            tokio::task::spawn_blocking(move || {
1025                                match cell_factory(any_view, registry, request_context, ctx) {
1026                                    Ok(filtered_cellmap) => {
1027                                        log::trace!(
1028                                            "View cell factory succeeded tx={} view_id={}",
1029                                            tx_id,
1030                                            view_id_clone
1031                                        );
1032                                        let _ = sub_tx.send(SubscriptionReady::View {
1033                                            tx_id,
1034                                            view_id: view_id_clone,
1035                                            cellmap: filtered_cellmap,
1036                                            window,
1037                                        });
1038                                    }
1039                                    Err(e) => {
1040                                        log::error!(
1041                                            "Failed to create view cell for {}: {}",
1042                                            view_id_clone,
1043                                            e
1044                                        );
1045                                        if let Err(err) = priority_tx.try_send(
1046                                            MykoMessage::ViewError(ViewError {
1047                                                tx: tx_id.to_string(),
1048                                                view_id: view_id_clone.to_string(),
1049                                                message: e,
1050                                            }),
1051                                        ) {
1052                                            drop_logger.on_drop("ViewError", &err);
1053                                        }
1054                                    }
1055                                }
1056                            });
1057                        }
1058                        Err(e) => {
1059                            let message = format!("Failed to parse view {}: {}", view_id, e);
1060                            log::error!(
1061                                "{} | payload: {}",
1062                                message,
1063                                serde_json::to_string(&wrapped.view).unwrap_or_default()
1064                            );
1065                            if let Err(err) =
1066                                priority_tx.try_send(MykoMessage::ViewError(ViewError {
1067                                    tx: tx_id.to_string(),
1068                                    view_id: view_id.to_string(),
1069                                    message,
1070                                }))
1071                            {
1072                                drop_logger.on_drop("ViewError", &err);
1073                            }
1074                        }
1075                    }
1076                } else {
1077                    let message = format!("No registered handler for view: {}", view_id);
1078                    log::warn!("{}", message);
1079                    if let Err(err) = priority_tx.try_send(MykoMessage::ViewError(ViewError {
1080                        tx: tx_id.to_string(),
1081                        view_id: view_id.to_string(),
1082                        message,
1083                    })) {
1084                        drop_logger.on_drop("ViewError", &err);
1085                    }
1086                }
1087            }
1088
1089            MykoMessage::QueryCancel(CancelSubscription { tx: tx_id }) => {
1090                log::trace!(
1091                    "QueryCancel received: client={} tx={}",
1092                    session.client_id,
1093                    tx_id
1094                );
1095                let tx_id: Arc<str> = tx_id.into();
1096                if let Ok(mut map) = query_ids_by_tx.lock() {
1097                    map.remove(&tx_id);
1098                }
1099                if let Ok(mut map) = subscribe_started_by_tx.lock() {
1100                    map.remove(&tx_id);
1101                }
1102                session.cancel(&tx_id);
1103            }
1104
1105            MykoMessage::QueryWindow(QueryWindowUpdate { tx, window }) => {
1106                let tx_id: Arc<str> = tx.into();
1107                log::trace!(
1108                    "Query window request client={} tx={} has_window={} active_subscriptions={}",
1109                    session.client_id,
1110                    tx_id,
1111                    window.is_some(),
1112                    session.subscription_count()
1113                );
1114                session.update_query_window(&tx_id, window);
1115            }
1116            MykoMessage::ViewCancel(CancelSubscription { tx: tx_id }) => {
1117                log::trace!("View cancel: {}", tx_id);
1118                let tx_id: Arc<str> = tx_id.into();
1119                if let Ok(mut map) = view_ids_by_tx.lock() {
1120                    map.remove(&tx_id);
1121                }
1122                if let Ok(mut map) = subscribe_started_by_tx.lock() {
1123                    map.remove(&tx_id);
1124                }
1125                session.cancel(&tx_id);
1126            }
1127            MykoMessage::ViewWindow(ViewWindowUpdate { tx, window }) => {
1128                let tx_id: Arc<str> = tx.into();
1129                log::trace!("View window update: {}", tx_id);
1130                session.update_view_window(&tx_id, window);
1131            }
1132
1133            MykoMessage::Report(wrapped) => {
1134                // Extract tx from the report JSON
1135                let tx_id: Arc<str> = wrapped
1136                    .report
1137                    .get("tx")
1138                    .and_then(|v| v.as_str())
1139                    .unwrap_or("unknown")
1140                    .into();
1141                let report_id = &wrapped.report_id;
1142
1143                log::trace!(
1144                    "Report subscribe request client={} tx={} report_id={} active_subscriptions_before={}",
1145                    session.client_id,
1146                    tx_id,
1147                    report_id,
1148                    session.subscription_count()
1149                );
1150
1151                // Look up the report registration
1152                if let Some(report_data) = handler_registry.get_report(report_id) {
1153                    // Parse the report JSON to the concrete type
1154                    let parsed = (report_data.parse)(wrapped.report.clone());
1155                    match parsed {
1156                        Ok(any_report) => {
1157                            let request_context = Arc::new(RequestContext::from_client(
1158                                tx_id.clone(),
1159                                session.client_id.clone(),
1160                                host_id,
1161                            ));
1162
1163                            // Create the cell using the factory (with host_id for context)
1164                            match (report_data.cell_factory)(any_report, request_context, ctx) {
1165                                Ok(cell) => {
1166                                    session.subscribe_report(
1167                                        tx_id,
1168                                        report_id.as_str().into(),
1169                                        cell,
1170                                    );
1171                                }
1172                                Err(e) => {
1173                                    log::error!(
1174                                        "Failed to create report cell for {}: {}",
1175                                        report_id,
1176                                        e
1177                                    );
1178                                }
1179                            }
1180                        }
1181                        Err(e) => {
1182                            log::error!(
1183                                "Failed to parse report {}: {} | payload: {}",
1184                                report_id,
1185                                e,
1186                                serde_json::to_string(&wrapped.report).unwrap_or_default()
1187                            );
1188                        }
1189                    }
1190                } else {
1191                    log::warn!("No registered handler for report: {}", report_id);
1192                }
1193            }
1194
1195            MykoMessage::ReportCancel(CancelSubscription { tx: tx_id }) => {
1196                log::trace!(
1197                    "ReportCancel received: client={} tx={} active_subscriptions_before={}",
1198                    session.client_id,
1199                    tx_id,
1200                    session.subscription_count()
1201                );
1202                session.cancel(&tx_id.into());
1203            }
1204
1205            MykoMessage::Event(mut event) => {
1206                record_ws_ingest(std::slice::from_ref(&event));
1207                event.sanitize_null_bytes();
1208                normalize_incoming_event(&mut event, &session.client_id, host_id);
1209                if let Err(e) = ctx.apply_event(event) {
1210                    log::error!(
1211                        "Failed to apply event from client {}: {e}",
1212                        session.client_id
1213                    );
1214                }
1215            }
1216
1217            MykoMessage::EventBatch(mut events) => {
1218                record_ws_ingest(&events);
1219                let incoming = events.len();
1220                if incoming >= 64 {
1221                    log::trace!(
1222                        "Received event batch from client {} size={}",
1223                        session.client_id,
1224                        incoming
1225                    );
1226                }
1227                for event in &mut events {
1228                    event.sanitize_null_bytes();
1229                    normalize_incoming_event(event, &session.client_id, host_id);
1230                }
1231                match ctx.apply_event_batch(events) {
1232                    Ok(applied) => {
1233                        log::trace!(
1234                            "Applied event batch from client {} size={}",
1235                            session.client_id,
1236                            applied
1237                        );
1238                    }
1239                    Err(e) => {
1240                        log::error!(
1241                            "Failed to apply event batch from client {}: {}",
1242                            session.client_id,
1243                            e
1244                        );
1245                    }
1246                }
1247            }
1248
1249            MykoMessage::Command(wrapped) => {
1250                // Extract tx from the command JSON
1251                let tx_id: Arc<str> = wrapped
1252                    .command
1253                    .get("tx")
1254                    .and_then(|v| v.as_str())
1255                    .unwrap_or("unknown")
1256                    .into();
1257
1258                let command_id = &wrapped.command_id;
1259
1260                log::trace!("Command {} (tx: {})", command_id, tx_id,);
1261                let received_at = Instant::now();
1262                if let Ok(mut map) = command_started_by_tx.lock() {
1263                    map.insert(tx_id.clone(), received_at);
1264                }
1265                if let Err(e) = command_tx.send(CommandJob {
1266                    tx_id: tx_id.clone(),
1267                    command_id: wrapped.command_id.clone(),
1268                    command: wrapped.command.clone(),
1269                    received_at,
1270                }) {
1271                    log::error!(
1272                        "Failed to enqueue command {} for client {} tx={}: {}",
1273                        command_id,
1274                        session.client_id,
1275                        tx_id,
1276                        e
1277                    );
1278                    let error = MykoMessage::CommandError(CommandError {
1279                        tx: tx_id.to_string(),
1280                        command_id: command_id.to_string(),
1281                        message: "Command queue unavailable".to_string(),
1282                    });
1283                    if let Err(err) = priority_tx.try_send(error) {
1284                        drop_logger.on_drop("CommandError", &err);
1285                    }
1286                }
1287            }
1288
1289            MykoMessage::Ping(PingData { id, timestamp }) => {
1290                // Echo back the ping data
1291                let pong = MykoMessage::Ping(PingData { id, timestamp });
1292                if let Err(e) = priority_tx.try_send(pong) {
1293                    drop_logger.on_drop("Ping", &e);
1294                }
1295            }
1296
1297            // Response messages - these shouldn't come from clients.
1298            MykoMessage::QueryResponse(resp) => {
1299                log::warn!(
1300                    "Unexpected client message kind=query_response client={} tx={} seq={} upserts={} deletes={} active_subscriptions={}",
1301                    session.client_id,
1302                    resp.tx,
1303                    resp.sequence,
1304                    resp.upserts.len(),
1305                    resp.deletes.len(),
1306                    session.subscription_count()
1307                );
1308            }
1309            MykoMessage::QueryError(err) => {
1310                log::warn!(
1311                    "Unexpected client message kind=query_error client={} tx={} query_id={} message={} active_subscriptions={}",
1312                    session.client_id,
1313                    err.tx,
1314                    err.query_id,
1315                    err.message,
1316                    session.subscription_count()
1317                );
1318            }
1319            MykoMessage::ViewResponse(resp) => {
1320                log::warn!(
1321                    "Unexpected client message kind=view_response client={} tx={} seq={} upserts={} deletes={} active_subscriptions={}",
1322                    session.client_id,
1323                    resp.tx,
1324                    resp.sequence,
1325                    resp.upserts.len(),
1326                    resp.deletes.len(),
1327                    session.subscription_count()
1328                );
1329            }
1330            MykoMessage::ViewError(err) => {
1331                log::warn!(
1332                    "Unexpected client message kind=view_error client={} tx={} view_id={} message={} active_subscriptions={}",
1333                    session.client_id,
1334                    err.tx,
1335                    err.view_id,
1336                    err.message,
1337                    session.subscription_count()
1338                );
1339            }
1340            MykoMessage::ReportResponse(resp) => {
1341                log::warn!(
1342                    "Unexpected client message kind=report_response client={} tx={} active_subscriptions={}",
1343                    session.client_id,
1344                    resp.tx,
1345                    session.subscription_count()
1346                );
1347            }
1348            MykoMessage::ReportError(err) => {
1349                log::warn!(
1350                    "Unexpected client message kind=report_error client={} tx={} report_id={} message={} active_subscriptions={}",
1351                    session.client_id,
1352                    err.tx,
1353                    err.report_id,
1354                    err.message,
1355                    session.subscription_count()
1356                );
1357            }
1358            MykoMessage::CommandResponse(resp) => {
1359                if resp.tx.trim().is_empty() {
1360                    log::warn!(
1361                        "Malformed client message kind=command_response client={} tx=<empty> active_subscriptions={}",
1362                        session.client_id,
1363                        session.subscription_count()
1364                    );
1365                } else {
1366                    let correlated = outbound_commands_by_tx
1367                        .lock()
1368                        .ok()
1369                        .and_then(|mut map| map.remove(&resp.tx));
1370                    if let Some((command_id, started)) = correlated {
1371                        log::debug!(
1372                            "Client command response matched outbound command client={} tx={} command_id={} roundtrip_ms={} active_subscriptions={}",
1373                            session.client_id,
1374                            resp.tx,
1375                            command_id,
1376                            started.elapsed().as_millis(),
1377                            session.subscription_count()
1378                        );
1379                    } else {
1380                        log::warn!(
1381                            "Client command response without outbound match client={} tx={} active_subscriptions={}",
1382                            session.client_id,
1383                            resp.tx,
1384                            session.subscription_count()
1385                        );
1386                    }
1387                }
1388            }
1389            MykoMessage::CommandError(err) => {
1390                if err.tx.trim().is_empty() {
1391                    log::warn!(
1392                        "Malformed client message kind=command_error client={} tx=<empty> command_id={} message={} active_subscriptions={}",
1393                        session.client_id,
1394                        err.command_id,
1395                        err.message,
1396                        session.subscription_count()
1397                    );
1398                } else {
1399                    let correlated = outbound_commands_by_tx
1400                        .lock()
1401                        .ok()
1402                        .and_then(|mut map| map.remove(&err.tx));
1403                    if let Some((command_id, started)) = correlated {
1404                        log::warn!(
1405                            "Client command error matched outbound command client={} tx={} command_id={} transport_command_id={} message={} roundtrip_ms={} active_subscriptions={}",
1406                            session.client_id,
1407                            err.tx,
1408                            err.command_id,
1409                            command_id,
1410                            err.message,
1411                            started.elapsed().as_millis(),
1412                            session.subscription_count()
1413                        );
1414                    } else {
1415                        log::warn!(
1416                            "Client command error without outbound match client={} tx={} command_id={} message={} active_subscriptions={}",
1417                            session.client_id,
1418                            err.tx,
1419                            err.command_id,
1420                            err.message,
1421                            session.subscription_count()
1422                        );
1423                    }
1424                }
1425            }
1426            MykoMessage::Benchmark(payload) => {
1427                let stats = ws_benchmark_stats();
1428                ensure_ws_benchmark_logger();
1429                stats.message_count.fetch_add(1, Ordering::Relaxed);
1430                // Estimate payload size from the JSON value
1431                let size = payload.to_string().len() as u64;
1432                stats.total_bytes.fetch_add(size, Ordering::Relaxed);
1433            }
1434        }
1435
1436        Ok(())
1437    }
1438
1439    fn execute_command_job(
1440        ctx: Arc<CellServerCtx>,
1441        priority_tx: &mpsc::Sender<MykoMessage>,
1442        drop_logger: &DropLogger,
1443        client_id: Arc<str>,
1444        job: CommandJob,
1445    ) {
1446        let host_id = ctx.host_id;
1447        let started = Instant::now();
1448        let queue_wait_ms = started.duration_since(job.received_at).as_millis();
1449        let command_id = job.command_id.clone();
1450
1451        let mut handler_found = false;
1452        for registration in inventory::iter::<CommandHandlerRegistration> {
1453            if registration.command_id == command_id {
1454                handler_found = true;
1455                let executor = (registration.factory)();
1456
1457                let req = Arc::new(RequestContext::from_client(
1458                    job.tx_id.clone(),
1459                    client_id.clone(),
1460                    host_id,
1461                ));
1462                let cmd_id: Arc<str> = Arc::from(command_id.clone());
1463                let cmd_ctx = CommandContext::new(cmd_id, req, ctx.clone());
1464                let execute_started = Instant::now();
1465
1466                match executor.execute_from_value(job.command.clone(), cmd_ctx) {
1467                    Ok(result) => {
1468                        let response = MykoMessage::CommandResponse(CommandResponse {
1469                            response: result,
1470                            tx: job.tx_id.to_string(),
1471                        });
1472                        if let Err(e) = priority_tx.try_send(response) {
1473                            drop_logger.on_drop("CommandResponse", &e);
1474                        }
1475                    }
1476                    Err(e) => {
1477                        let error = MykoMessage::CommandError(CommandError {
1478                            tx: job.tx_id.to_string(),
1479                            command_id: command_id.clone(),
1480                            message: e.message,
1481                        });
1482                        if let Err(err) = priority_tx.try_send(error) {
1483                            drop_logger.on_drop("CommandError", &err);
1484                        }
1485                    }
1486                }
1487                let execute_ms = execute_started.elapsed().as_millis();
1488                let total_ms = job.received_at.elapsed().as_millis();
1489                log::trace!(
1490                    target: "myko_server::ws_perf",
1491                    "command_exec client={} tx={} command_id={} queue_wait_ms={} execute_ms={} total_ms={}",
1492                    client_id,
1493                    job.tx_id,
1494                    command_id,
1495                    queue_wait_ms,
1496                    execute_ms,
1497                    total_ms
1498                );
1499                break;
1500            }
1501        }
1502
1503        if !handler_found {
1504            log::warn!("No registered handler for command: {}", command_id);
1505            let error = MykoMessage::CommandError(CommandError {
1506                tx: job.tx_id.to_string(),
1507                command_id: command_id.clone(),
1508                message: format!("Command handler not found: {}", command_id),
1509            });
1510            if let Err(e) = priority_tx.try_send(error) {
1511                drop_logger.on_drop("CommandError", &e);
1512            }
1513        }
1514
1515        if !handler_found {
1516            log::debug!(
1517                target: "myko_server::ws_perf",
1518                "command_exec client={} tx={} command_id={} queue_wait_ms={} execute_ms=0 total_ms={} handler_found=false",
1519                client_id,
1520                job.tx_id,
1521                command_id,
1522                queue_wait_ms,
1523                job.received_at.elapsed().as_millis()
1524            );
1525        }
1526    }
1527}
1528
1529/// Channel-based WebSocket writer.
1530///
1531/// Sends messages through an mpsc channel which are then
1532/// forwarded to the actual WebSocket.
1533struct ChannelWriter {
1534    tx: mpsc::Sender<OutboundMessage>,
1535    deferred_tx: mpsc::Sender<DeferredOutbound>,
1536    drop_logger: Arc<DropLogger>,
1537    outgoing_format: Arc<AtomicU8>,
1538}
1539
1540impl ChannelWriter {
1541    /// Cheap "is the writer task gone?" check used to short-circuit
1542    /// subscriber callbacks for a disconnected client. `Sender::is_closed`
1543    /// returns true once the matching receiver is dropped, which happens as
1544    /// soon as the write task exits. Avoiding the work here prevents the
1545    /// "buffer full / channel closed" log storm we used to see for 10+
1546    /// seconds after every disconnect while the session was still tearing
1547    /// down its subscription guards.
1548    #[inline]
1549    fn tx_dead(&self) -> bool {
1550        self.tx.is_closed()
1551    }
1552
1553    #[inline]
1554    fn deferred_dead(&self) -> bool {
1555        self.deferred_tx.is_closed()
1556    }
1557}
1558
1559impl WsWriter for ChannelWriter {
1560    fn send(&self, msg: MykoMessage) {
1561        // Fast path: writer is gone. Don't try to send and don't log; the
1562        // dead-channel state is expected after the client disconnects, and
1563        // a dropped subscription will follow shortly when the session
1564        // teardown finishes. Avoiding the log+counter prevents the
1565        // "buffer full / channel closed" warning storm we used to see for
1566        // 10+ seconds after every disconnect.
1567        if self.tx_dead() {
1568            return;
1569        }
1570        if let Err(e) = self.tx.try_send(OutboundMessage::Message(msg)) {
1571            // Closed errors here race with the writer task exiting between
1572            // the is_dead check and the try_send; suppress them too.
1573            if !matches!(e, mpsc::error::TrySendError::Closed(_)) {
1574                self.drop_logger.on_drop("message", &e);
1575            }
1576        }
1577    }
1578
1579    fn protocol(&self) -> MykoProtocol {
1580        MykoProtocol::from(self.outgoing_format.load(Ordering::SeqCst))
1581    }
1582
1583    fn send_serialized_command(
1584        &self,
1585        tx: Arc<str>,
1586        command_id: String,
1587        payload: EncodedCommandMessage,
1588    ) {
1589        if self.tx_dead() {
1590            return;
1591        }
1592        if let Err(e) = self.tx.try_send(OutboundMessage::SerializedCommand {
1593            tx,
1594            command_id,
1595            payload,
1596        }) && !matches!(e, mpsc::error::TrySendError::Closed(_))
1597        {
1598            self.drop_logger.on_drop("serialized_command", &e);
1599        }
1600    }
1601
1602    fn send_report_response(&self, tx: Arc<str>, output: Arc<dyn AnyOutput>) {
1603        if self.deferred_dead() {
1604            return;
1605        }
1606        if let Err(e) = self
1607            .deferred_tx
1608            .try_send(DeferredOutbound::Report(tx, output))
1609            && !matches!(e, mpsc::error::TrySendError::Closed(_))
1610        {
1611            self.drop_logger.on_drop("ReportResponseDeferred", &e);
1612        }
1613    }
1614
1615    fn send_query_response(&self, response: PendingQueryResponse, is_view: bool) {
1616        if self.deferred_dead() {
1617            return;
1618        }
1619        if let Err(e) = self
1620            .deferred_tx
1621            .try_send(DeferredOutbound::Query { response, is_view })
1622            && !matches!(e, mpsc::error::TrySendError::Closed(_))
1623        {
1624            self.drop_logger.on_drop("QueryResponseDeferred", &e);
1625        }
1626    }
1627}
1628
1629#[cfg(test)]
1630mod tests {
1631    use super::*;
1632
1633    #[test]
1634    fn test_channel_writer() {
1635        let (tx, mut rx) = mpsc::channel(10);
1636        let (deferred_tx, _deferred_rx) = mpsc::channel(10);
1637        let drop_logger = Arc::new(DropLogger::new("test-client".into()));
1638        let writer = ChannelWriter {
1639            tx,
1640            deferred_tx,
1641            drop_logger,
1642            outgoing_format: Arc::new(AtomicU8::new(MykoProtocol::JSON as u8)),
1643        };
1644
1645        let msg = MykoMessage::Ping(PingData {
1646            id: "test".to_string(),
1647            timestamp: 0,
1648        });
1649        writer.send(msg);
1650
1651        let received = rx.try_recv().unwrap();
1652        assert!(matches!(
1653            received,
1654            OutboundMessage::Message(MykoMessage::Ping(_))
1655        ));
1656    }
1657
1658    #[test]
1659    fn outgoing_format_starts_as_json_and_promotes_to_cbor() {
1660        use std::sync::atomic::{AtomicU8, Ordering};
1661
1662        let outgoing_format = AtomicU8::new(MykoProtocol::JSON as u8);
1663
1664        // Initially JSON.
1665        assert_eq!(
1666            MykoProtocol::from(outgoing_format.load(Ordering::SeqCst)),
1667            MykoProtocol::JSON,
1668        );
1669
1670        // Simulate receiving a binary frame: promote.
1671        outgoing_format.store(MykoProtocol::CBOR as u8, Ordering::SeqCst);
1672        assert_eq!(
1673            MykoProtocol::from(outgoing_format.load(Ordering::SeqCst)),
1674            MykoProtocol::CBOR,
1675        );
1676
1677        // Simulate receiving more text frames after promotion: no change.
1678        // (The handler in the read loop only writes on Binary, never on Text,
1679        // so this is a no-op assertion that the field's last-write-wins
1680        // semantics give us stickiness for free.)
1681        assert_eq!(
1682            MykoProtocol::from(outgoing_format.load(Ordering::SeqCst)),
1683            MykoProtocol::CBOR,
1684        );
1685    }
1686}