Skip to main content

myko_server/
ws_handler.rs

1//! WebSocket handler for the cell-based server.
2//!
3//! Handles WebSocket connections using ClientSession for subscription management.
4
5use std::{
6    collections::HashMap,
7    net::SocketAddr,
8    sync::{
9        Arc, Mutex, OnceLock,
10        atomic::{AtomicBool, AtomicU8, AtomicU64, Ordering},
11    },
12    thread,
13    time::{Duration, Instant},
14};
15
16use dashmap::DashMap;
17use futures_util::{SinkExt, StreamExt};
18use hyphae::SelectExt;
19use myko::{
20    WS_MAX_FRAME_SIZE_BYTES, WS_MAX_MESSAGE_SIZE_BYTES,
21    client::MykoProtocol,
22    command::{CommandContext, CommandHandlerRegistration},
23    entities::client::{Client, ClientId},
24    relationship::{
25        iter_client_id_registrations, iter_fallback_to_id_registrations,
26        iter_server_owned_registrations,
27    },
28    report::AnyOutput,
29    request::RequestContext,
30    server::{
31        CellServerCtx, ClientSession, PendingQueryResponse, WsWriter,
32        client_registry::try_client_registry,
33    },
34    wire::{
35        CancelSubscription, CommandError, CommandResponse, EncodedCommandMessage, MEvent,
36        MEventType, MykoMessage, PingData, QueryWindowUpdate, ViewError, ViewWindowUpdate,
37    },
38};
39use tokio::{net::TcpStream, sync::mpsc, time::interval};
40use tokio_tungstenite::{
41    accept_async_with_config,
42    tungstenite::{Message, protocol::WebSocketConfig},
43};
44use uuid::Uuid;
45
46struct WsIngestStats {
47    counts_by_type: DashMap<Arc<str>, Arc<AtomicU64>>,
48}
49
50struct WsBenchmarkStats {
51    message_count: AtomicU64,
52    total_bytes: AtomicU64,
53}
54
55static WS_INGEST_STATS: OnceLock<Arc<WsIngestStats>> = OnceLock::new();
56static WS_INGEST_LOGGER_STARTED: AtomicBool = AtomicBool::new(false);
57static WS_BENCHMARK_STATS: OnceLock<Arc<WsBenchmarkStats>> = OnceLock::new();
58static WS_BENCHMARK_LOGGER_STARTED: AtomicBool = AtomicBool::new(false);
59
60fn ws_benchmark_stats() -> Arc<WsBenchmarkStats> {
61    WS_BENCHMARK_STATS
62        .get_or_init(|| {
63            Arc::new(WsBenchmarkStats {
64                message_count: AtomicU64::new(0),
65                total_bytes: AtomicU64::new(0),
66            })
67        })
68        .clone()
69}
70
71fn ensure_ws_benchmark_logger() {
72    if WS_BENCHMARK_LOGGER_STARTED
73        .compare_exchange(false, true, Ordering::Relaxed, Ordering::Relaxed)
74        .is_err()
75    {
76        return;
77    }
78
79    let stats = ws_benchmark_stats();
80    thread::Builder::new()
81        .name("ws-benchmark-logger".into())
82        .spawn(move || {
83            loop {
84                thread::sleep(Duration::from_secs(1));
85
86                let count = stats.message_count.swap(0, Ordering::Relaxed);
87                let bytes = stats.total_bytes.swap(0, Ordering::Relaxed);
88
89                if count == 0 {
90                    continue;
91                }
92
93                log::info!(
94                    "WebSocket benchmark last_1s messages={} bytes={} avg_bytes={}",
95                    count,
96                    bytes,
97                    bytes / count
98                );
99            }
100        })
101        .expect("failed to spawn websocket benchmark logger thread");
102}
103
104fn ws_ingest_stats() -> Arc<WsIngestStats> {
105    WS_INGEST_STATS
106        .get_or_init(|| {
107            Arc::new(WsIngestStats {
108                counts_by_type: DashMap::new(),
109            })
110        })
111        .clone()
112}
113
114fn ensure_ws_ingest_logger() {
115    if WS_INGEST_LOGGER_STARTED
116        .compare_exchange(false, true, Ordering::Relaxed, Ordering::Relaxed)
117        .is_err()
118    {
119        return;
120    }
121
122    let stats = ws_ingest_stats();
123    thread::Builder::new()
124        .name("ws-ingest-logger".into())
125        .spawn(move || {
126            loop {
127                thread::sleep(Duration::from_secs(1));
128
129                let mut per_type = Vec::new();
130                let mut total = 0u64;
131                for entry in &stats.counts_by_type {
132                    let count = entry.value().swap(0, Ordering::Relaxed);
133                    if count == 0 {
134                        continue;
135                    }
136                    total += count;
137                    per_type.push((entry.key().clone(), count));
138                }
139
140                if total == 0 {
141                    continue;
142                }
143
144                per_type.sort_by(|a, b| b.1.cmp(&a.1).then_with(|| a.0.cmp(&b.0)));
145                let details = per_type
146                    .into_iter()
147                    .map(|(entity_type, count)| format!("{entity_type}={count}"))
148                    .collect::<Vec<_>>()
149                    .join(" ");
150
151                log::info!("WebSocket ingest last_1s total={} {}", total, details);
152            }
153        })
154        .expect("failed to spawn websocket ingest logger thread");
155}
156
157fn record_ws_ingest(events: &[MEvent]) {
158    if events.is_empty() {
159        return;
160    }
161
162    let stats = ws_ingest_stats();
163    for event in events {
164        let counter = stats
165            .counts_by_type
166            .entry(Arc::<str>::from(event.item_type.as_str()))
167            .or_insert_with(|| Arc::new(AtomicU64::new(0)))
168            .clone();
169        counter.fetch_add(1, Ordering::Relaxed);
170    }
171}
172
173fn normalize_incoming_event(event: &mut MEvent, client_id: &str, host_id: uuid::Uuid) {
174    if event.change_type != MEventType::SET {
175        return;
176    }
177
178    // Auto-populate #[myko_client_id] fields with the connection's client_id
179    for reg in iter_client_id_registrations() {
180        if reg.entity_type == event.item_type {
181            if let Some(obj) = event.item.as_object_mut() {
182                obj.insert(
183                    reg.field_name_json.to_string(),
184                    serde_json::Value::String(client_id.to_string()),
185                );
186            }
187            break;
188        }
189    }
190
191    // Auto-populate #[server_owned] fields with this server's ID
192    for reg in iter_server_owned_registrations() {
193        if reg.entity_type == event.item_type {
194            if let Some(obj) = event.item.as_object_mut() {
195                let field = reg.field_name_json;
196                let current = obj.get(field).and_then(|v| v.as_str()).unwrap_or("");
197                if current.is_empty() {
198                    obj.insert(
199                        field.to_string(),
200                        serde_json::Value::String(host_id.to_string()),
201                    );
202                }
203            }
204            break;
205        }
206    }
207
208    // Auto-populate #[fallback_to_id] fields with the entity's own id
209    // if the field is null or missing.
210    if let Some(obj) = event.item.as_object_mut()
211        && let Some(id) = obj.get("id").and_then(|v| v.as_str()).map(String::from)
212    {
213        for reg in iter_fallback_to_id_registrations() {
214            if reg.entity_type == event.item_type {
215                let field = reg.field_name_json;
216                if matches!(obj.get(field), None | Some(serde_json::Value::Null)) {
217                    obj.insert(field.to_string(), serde_json::Value::String(id.clone()));
218                }
219            }
220        }
221    }
222}
223
224/// Per-connection drop tracking to avoid log storms when clients fall behind.
225///
226/// When the outbound channel is full, we will drop messages (same as today),
227/// but we must not `warn!` for every drop or we can effectively DoS ourselves.
228struct DropLogger {
229    client_id: Arc<str>,
230    dropped: std::sync::atomic::AtomicU64,
231    last_log_ms: std::sync::atomic::AtomicU64,
232}
233
234impl DropLogger {
235    fn new(client_id: Arc<str>) -> Self {
236        Self {
237            client_id,
238            dropped: std::sync::atomic::AtomicU64::new(0),
239            last_log_ms: std::sync::atomic::AtomicU64::new(0),
240        }
241    }
242
243    fn on_drop(&self, kind: &'static str, err: &dyn std::fmt::Display) {
244        use std::sync::atomic::Ordering;
245
246        self.dropped.fetch_add(1, Ordering::Relaxed);
247
248        // Log at most once per second per connection.
249        let now_ms = std::time::SystemTime::now()
250            .duration_since(std::time::UNIX_EPOCH)
251            .unwrap_or_default()
252            .as_millis() as u64;
253        let last_ms = self.last_log_ms.load(Ordering::Relaxed);
254        if now_ms.saturating_sub(last_ms) < 1000 {
255            return;
256        }
257
258        if self
259            .last_log_ms
260            .compare_exchange(last_ms, now_ms, Ordering::Relaxed, Ordering::Relaxed)
261            .is_err()
262        {
263            return;
264        }
265
266        let n = self.dropped.swap(0, Ordering::Relaxed);
267        log::warn!(
268            "WebSocket send buffer full; dropped {} message(s) for client {} (latest: {}): {}",
269            n,
270            self.client_id,
271            kind,
272            err
273        );
274    }
275}
276
277struct CommandJob {
278    tx_id: Arc<str>,
279    command_id: String,
280    command: serde_json::Value,
281    received_at: Instant,
282}
283
284/// Result of an async subscription build (query or view cell_factory).
285enum SubscriptionReady {
286    Query {
287        tx_id: Arc<str>,
288        cellmap: hyphae::CellMap<Arc<str>, Arc<dyn myko::item::AnyItem>, hyphae::CellImmutable>,
289        window: Option<myko::wire::QueryWindow>,
290    },
291    View {
292        tx_id: Arc<str>,
293        view_id: Arc<str>,
294        cellmap: hyphae::CellMap<Arc<str>, Arc<dyn myko::item::AnyItem>, hyphae::CellImmutable>,
295        window: Option<myko::wire::QueryWindow>,
296    },
297}
298
299enum OutboundMessage {
300    Message(MykoMessage),
301    SerializedCommand {
302        tx: Arc<str>,
303        command_id: String,
304        payload: EncodedCommandMessage,
305    },
306}
307
308enum DeferredOutbound {
309    Report(Arc<str>, Arc<dyn AnyOutput>),
310    Query {
311        response: PendingQueryResponse,
312        is_view: bool,
313    },
314}
315
316/// WebSocket handler for a single client connection.
317pub struct WsHandler;
318
319impl WsHandler {
320    /// Handle a new WebSocket connection.
321    #[allow(clippy::too_many_arguments)]
322    pub async fn handle_connection(
323        stream: TcpStream,
324        addr: SocketAddr,
325        ctx: Arc<CellServerCtx>,
326    ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
327        ensure_ws_ingest_logger();
328
329        let host_id = ctx.host_id;
330
331        let mut ws_config = WebSocketConfig::default();
332        ws_config.max_message_size = Some(WS_MAX_MESSAGE_SIZE_BYTES);
333        ws_config.max_frame_size = Some(WS_MAX_FRAME_SIZE_BYTES);
334        let ws_stream = accept_async_with_config(stream, Some(ws_config)).await?;
335        let (mut write, mut read) = ws_stream.split();
336
337        // Create a bounded channel for sending messages to the client
338        // High limit (10k) since we have good memory availability
339        let (tx, mut rx) = mpsc::channel::<OutboundMessage>(10_000);
340        let (deferred_tx, mut deferred_rx) = mpsc::channel::<DeferredOutbound>(10_000);
341        let (priority_tx, mut priority_rx) = mpsc::channel::<MykoMessage>(1_000);
342        let (command_tx, mut command_rx) = mpsc::unbounded_channel::<CommandJob>();
343        let (subscribe_tx, mut subscribe_rx) = mpsc::unbounded_channel::<SubscriptionReady>();
344
345        // Outgoing format for this session: defaults to JSON, sticky-promotes
346        // to CBOR on the first received binary frame. Never demotes.
347        let outgoing_format = Arc::new(AtomicU8::new(MykoProtocol::JSON as u8));
348
349        // Create client session with channel-based writer
350        let client_id: Arc<str> = Uuid::new_v4().to_string().into();
351        let drop_logger = Arc::new(DropLogger::new(client_id.clone()));
352        let writer = ChannelWriter {
353            tx: tx.clone(),
354            deferred_tx: deferred_tx.clone(),
355            drop_logger: drop_logger.clone(),
356            outgoing_format: outgoing_format.clone(),
357        };
358
359        // Register writer in the global client registry (if initialized)
360        let writer_arc: Arc<dyn WsWriter> = Arc::new(ChannelWriter {
361            tx: tx.clone(),
362            deferred_tx: deferred_tx.clone(),
363            drop_logger: drop_logger.clone(),
364            outgoing_format: outgoing_format.clone(),
365        });
366        if let Some(registry) = try_client_registry() {
367            registry.register(client_id.clone(), writer_arc);
368        }
369
370        let mut session = ClientSession::new(client_id.clone(), writer);
371
372        let outgoing_format_writer = outgoing_format.clone();
373        let query_ids_by_tx: Arc<Mutex<HashMap<Arc<str>, Arc<str>>>> =
374            Arc::new(Mutex::new(HashMap::new()));
375        let view_ids_by_tx: Arc<Mutex<HashMap<Arc<str>, Arc<str>>>> =
376            Arc::new(Mutex::new(HashMap::new()));
377        let subscribe_started_by_tx: Arc<Mutex<HashMap<Arc<str>, Instant>>> =
378            Arc::new(Mutex::new(HashMap::new()));
379        let command_started_by_tx: Arc<Mutex<HashMap<Arc<str>, Instant>>> =
380            Arc::new(Mutex::new(HashMap::new()));
381        let outbound_commands_by_tx: Arc<Mutex<HashMap<String, (String, Instant)>>> =
382            Arc::new(Mutex::new(HashMap::new()));
383
384        let outbound_commands_by_tx_writer = outbound_commands_by_tx.clone();
385
386        // Publish Client entity
387        let client_entity = Client {
388            id: ClientId(client_id.clone()),
389            server_id: host_id.to_string().into(),
390            address: Some(Arc::from(addr.to_string())),
391            windback: None,
392        };
393        if let Err(e) = ctx.set(&client_entity) {
394            log::error!("Failed to persist client entity: {e}");
395        }
396
397        log::info!("Client connected: {} from {}", client_id, addr);
398
399        let write_ctx = ctx.clone();
400        let write_client_id = client_id.clone();
401        let write_addr = addr;
402        let command_ctx = ctx.clone();
403        let command_priority_tx = priority_tx.clone();
404        let command_drop_logger = drop_logger.clone();
405        let command_client_id = client_id.clone();
406
407        // Spawn task to forward messages from channel to WebSocket
408        let write_task = tokio::spawn(async move {
409            let _ctx = write_ctx;
410            let mut normal_open = true;
411            let mut priority_open = true;
412            let mut deferred_open = true;
413            while normal_open || priority_open || deferred_open {
414                let msg = tokio::select! {
415                    biased;
416                    maybe = priority_rx.recv(), if priority_open => {
417                        match maybe {
418                            Some(msg) => OutboundMessage::Message(msg),
419                            None => {
420                                priority_open = false;
421                                continue;
422                            }
423                        }
424                    }
425                    maybe = deferred_rx.recv(), if deferred_open => {
426                        match maybe {
427                            Some(DeferredOutbound::Report(tx, output)) => {
428                                OutboundMessage::Message(MykoMessage::ReportResponse(myko::wire::ReportResponse {
429                                    response: output.to_value(),
430                                    tx: tx.to_string(),
431                                }))
432                            }
433                            Some(DeferredOutbound::Query { response, is_view }) => {
434                                if is_view {
435                                    OutboundMessage::Message(MykoMessage::ViewResponse(response.into_wire()))
436                                } else {
437                                    OutboundMessage::Message(MykoMessage::QueryResponse(response.into_wire()))
438                                }
439                            }
440                            None => {
441                                deferred_open = false;
442                                continue;
443                            }
444                        }
445                    }
446                    maybe = rx.recv(), if normal_open => {
447                        match maybe {
448                            Some(msg) => msg,
449                            None => {
450                                normal_open = false;
451                                continue;
452                            }
453                        }
454                    }
455                };
456                // Per-message timing tap (outbound). Counts by kind into the
457                // global ws_timing instrumentation. Counterpart to the inbound
458                // tap in `handle_message`.
459                match &msg {
460                    OutboundMessage::SerializedCommand { .. } => {
461                        crate::ws_timing::record_outbound("Command")
462                    }
463                    OutboundMessage::Message(m) => {
464                        crate::ws_timing::record_outbound(crate::ws_timing::message_kind(m))
465                    }
466                };
467                let (kind, tx_id, seq, _upserts, _deletes, _total_count) = match &msg {
468                    OutboundMessage::SerializedCommand { tx, .. } => {
469                        ("command", Some(tx.clone()), None, None, None, None)
470                    }
471                    OutboundMessage::Message(msg) => match msg {
472                        MykoMessage::ViewResponse(r) => (
473                            "view_response",
474                            Some(r.tx.clone()),
475                            Some(r.sequence),
476                            Some(r.upserts.len()),
477                            Some(r.deletes.len()),
478                            r.total_count,
479                        ),
480                        MykoMessage::QueryResponse(r) => (
481                            "query_response",
482                            Some(r.tx.clone()),
483                            Some(r.sequence),
484                            Some(r.upserts.len()),
485                            Some(r.deletes.len()),
486                            r.total_count,
487                        ),
488                        MykoMessage::CommandResponse(r) => (
489                            "command_response",
490                            Some(Arc::<str>::from(r.tx.clone())),
491                            None,
492                            None,
493                            None,
494                            None,
495                        ),
496                        MykoMessage::CommandError(r) => (
497                            "command_error",
498                            Some(Arc::<str>::from(r.tx.clone())),
499                            None,
500                            None,
501                            None,
502                            None,
503                        ),
504                        _ => ("other", None, None, None, None, None),
505                    },
506                };
507
508                match &msg {
509                    OutboundMessage::SerializedCommand { tx, command_id, .. } => {
510                        if !tx.trim().is_empty()
511                            && let Ok(mut map) = outbound_commands_by_tx_writer.lock()
512                        {
513                            map.insert(tx.to_string(), (command_id.clone(), Instant::now()));
514                        }
515                    }
516                    OutboundMessage::Message(MykoMessage::Command(wrapped)) => {
517                        if let Some(tx_id) = wrapped.command.get("tx").and_then(|v| v.as_str())
518                            && !tx_id.trim().is_empty()
519                            && let Ok(mut map) = outbound_commands_by_tx_writer.lock()
520                        {
521                            map.insert(
522                                tx_id.to_string(),
523                                (wrapped.command_id.clone(), Instant::now()),
524                            );
525                        }
526                    }
527                    _ => {}
528                }
529
530                let ws_msg = match &msg {
531                    OutboundMessage::SerializedCommand {
532                        payload: EncodedCommandMessage::Json(json),
533                        ..
534                    } => Message::Text(json.clone().into()),
535                    OutboundMessage::SerializedCommand {
536                        payload: EncodedCommandMessage::Cbor(bytes),
537                        ..
538                    } => Message::Binary(bytes.clone().into()),
539                    OutboundMessage::Message(msg)
540                        if outgoing_format_writer.load(Ordering::SeqCst)
541                            == MykoProtocol::CBOR as u8 =>
542                    {
543                        let mut bytes = Vec::new();
544                        match ciborium::ser::into_writer(msg, &mut bytes) {
545                            Ok(()) => Message::Binary(bytes.into()),
546                            Err(e) => {
547                                log::error!("Failed to serialize message to CBOR: {}", e);
548                                continue;
549                            }
550                        }
551                    }
552                    OutboundMessage::Message(msg) => match serde_json::to_string(msg) {
553                        Ok(json) => Message::Text(json.into()),
554                        Err(e) => {
555                            log::error!("Failed to serialize message to JSON: {}", e);
556                            continue;
557                        }
558                    },
559                };
560                let payload_bytes = match &ws_msg {
561                    Message::Binary(b) => b.len(),
562                    Message::Text(t) => t.len(),
563                    _ => 0,
564                };
565
566                if let Err(err) = write.send(ws_msg).await {
567                    log::error!(
568                        "WebSocket write failed for client {} from {} kind={} tx={:?} seq={:?} payload_bytes={} binary={}: {}",
569                        write_client_id,
570                        write_addr,
571                        kind,
572                        tx_id,
573                        seq,
574                        payload_bytes,
575                        outgoing_format_writer.load(Ordering::SeqCst) == MykoProtocol::CBOR as u8,
576                        err
577                    );
578                    break;
579                }
580            }
581            // NOTE(ts): Unregister from client registry immediately so the node
582            // executor stops serializing commands into a dead channel.
583            if let Some(registry) = try_client_registry() {
584                registry.unregister(&write_client_id);
585                log::info!(
586                    "WebSocket writer unregistered client {} from {} (write task exiting)",
587                    write_client_id,
588                    write_addr,
589                );
590            }
591            log::warn!(
592                "WebSocket writer task exiting for client {} from {} normal_open={} priority_open={} deferred_open={}",
593                write_client_id,
594                write_addr,
595                normal_open,
596                priority_open,
597                deferred_open
598            );
599        });
600
601        // Execute commands on a dedicated worker so ping/cancel traffic is never
602        // blocked by long-running command handlers.
603        let command_started_cleanup = command_started_by_tx.clone();
604        let command_task = tokio::spawn(async move {
605            while let Some(job) = command_rx.recv().await {
606                let command_ctx = command_ctx.clone();
607                let command_priority_tx = command_priority_tx.clone();
608                let command_drop_logger = command_drop_logger.clone();
609                let command_client_id = command_client_id.clone();
610                let tx_id = job.tx_id.clone();
611                let started_map = command_started_cleanup.clone();
612                match tokio::task::spawn_blocking(move || {
613                    Self::execute_command_job(
614                        command_ctx,
615                        &command_priority_tx,
616                        command_drop_logger.as_ref(),
617                        command_client_id,
618                        job,
619                    );
620                })
621                .await
622                {
623                    Ok(()) => {}
624                    Err(e) => {
625                        log::error!("Command worker panicked: {}", e);
626                    }
627                }
628                // NOTE(ts): Clean up timing entry after command completes (success or panic).
629                if let Ok(mut map) = started_map.lock() {
630                    map.remove(&tx_id);
631                }
632            }
633        });
634
635        // Process incoming messages and completed subscription builds concurrently.
636        // NOTE(ts): View/query cell_factory calls are spawned on the blocking thread pool
637        // so they don't block command processing or other messages.
638        let mut outbound_ttl_interval = interval(Duration::from_secs(10));
639        outbound_ttl_interval.tick().await; // NOTE(ts): consume the immediate first tick
640        loop {
641            tokio::select! {
642                // Completed subscription builds — register with session
643                Some(ready) = subscribe_rx.recv() => {
644                    let tx_id = match &ready {
645                        SubscriptionReady::Query { tx_id, .. } => tx_id.clone(),
646                        SubscriptionReady::View { tx_id, .. } => tx_id.clone(),
647                    };
648                    if let Ok(mut map) = subscribe_started_by_tx.lock() {
649                        map.remove(&tx_id);
650                    }
651                    match ready {
652                        SubscriptionReady::Query { tx_id, cellmap, window } => {
653                            session.subscribe_query(tx_id, cellmap, window);
654                        }
655                        SubscriptionReady::View { tx_id, view_id, cellmap, window } => {
656                            session.subscribe_view_with_id(tx_id, view_id, cellmap, window);
657                        }
658                    }
659                }
660                // NOTE(ts): Sweep outbound command entries older than 10s.
661                // Responses normally arrive quickly; stale entries are from
662                // dropped connections or commands that will never get a response.
663                _ = outbound_ttl_interval.tick() => {
664                    if let Ok(mut map) = outbound_commands_by_tx.lock() {
665                        let before = map.len();
666                        map.retain(|_, (_, started)| started.elapsed() < Duration::from_secs(10));
667                        let removed = before - map.len();
668                        if removed > 0 {
669                            log::debug!(
670                                "Outbound command TTL sweep client={}: removed {} stale entries, {} remaining",
671                                session.client_id,
672                                removed,
673                                map.len()
674                            );
675                        }
676                    }
677                }
678                // Incoming WebSocket messages
679                msg = read.next() => {
680                    let Some(msg) = msg else { break };
681                    let ctx = ctx.clone();
682                    let msg = match msg {
683                        Ok(m) => m,
684                        Err(e) => {
685                            log::error!("WebSocket read error from {}: {}", client_id, e);
686                            break;
687                        }
688                    };
689
690                    match msg {
691                        Message::Binary(data) => {
692                            if outgoing_format.load(Ordering::SeqCst) != MykoProtocol::CBOR as u8 {
693                                log::debug!(
694                                    "Client {} promoted outgoing format to CBOR via demonstration",
695                                    client_id
696                                );
697                                outgoing_format.store(MykoProtocol::CBOR as u8, Ordering::SeqCst);
698                            }
699
700                            match ciborium::de::from_reader::<MykoMessage, _>(data.as_ref()) {
701                                Ok(myko_msg) => {
702                                    if let Err(e) = Self::handle_message(
703                                        &mut session,
704                                        ctx,
705                                        &priority_tx,
706                                        &drop_logger,
707                                        &query_ids_by_tx,
708                                        &view_ids_by_tx,
709                                        &subscribe_started_by_tx,
710                                        &command_started_by_tx,
711                                        &outbound_commands_by_tx,
712                                        &command_tx,
713                                        &subscribe_tx,
714                                        myko_msg,
715                                    ) {
716                                        log::error!("Error handling message: {}", e);
717                                    }
718                                    tokio::task::yield_now().await;
719                                }
720                                Err(e) => {
721                                    log::warn!("Failed to parse message from {}: {}", client_id, e);
722                                }
723                            }
724                        }
725                        Message::Text(text) => {
726                            match serde_json::from_str::<MykoMessage>(&text) {
727                                Ok(myko_msg) => {
728                                    if let Err(e) = Self::handle_message(
729                                        &mut session,
730                                        ctx,
731                                        &priority_tx,
732                                        &drop_logger,
733                                        &query_ids_by_tx,
734                                        &view_ids_by_tx,
735                                        &subscribe_started_by_tx,
736                                        &command_started_by_tx,
737                                        &outbound_commands_by_tx,
738                                        &command_tx,
739                                        &subscribe_tx,
740                                        myko_msg,
741                                    ) {
742                                        log::error!("Error handling message: {}", e);
743                                    }
744                                    tokio::task::yield_now().await;
745                                }
746                                Err(e) => {
747                                    log::warn!(
748                                        "Failed to parse JSON message from {}: {} | raw: {}",
749                                        client_id,
750                                        e,
751                                        if text.len() > 1000 {
752                                            &text[..1000]
753                                        } else {
754                                            &text
755                                        }
756                                    );
757                                }
758                            }
759                        }
760                        Message::Ping(data) => {
761                            log::trace!("Ping from {}", client_id);
762                            let _ = data;
763                        }
764                        Message::Pong(_) => {
765                            log::trace!("Pong from {}", client_id);
766                        }
767                        Message::Close(frame) => {
768                            log::warn!("Client {} sent close frame: {:?}", client_id, frame);
769                            break;
770                        }
771                        Message::Frame(_) => {}
772                    }
773                }
774            }
775        }
776
777        // Cleanup. Order matters:
778        // 1. Abort the write/command tasks first so their channel receivers
779        //    are dropped — `ChannelWriter` clones held by subscriber
780        //    callbacks then short-circuit via `tx_dead()` / `deferred_dead()`
781        //    instead of doing useless work and emitting log spam while we
782        //    tear down the session.
783        // 2. Unregister the client writer from the global registry so other
784        //    parts of the system stop dispatching commands here.
785        // 3. Move the session drop to a blocking thread. `drop(session)`
786        //    cancels each subscription guard, and hyphae's per-cell
787        //    unsubscribe is O(N) in the cell's subscriber list (Vec clone +
788        //    filter + ArcSwap store), so a session with thousands of
789        //    subscriptions can take significant CPU time. Doing it on the
790        //    async runtime would block other connections' tasks; the
791        //    blocking pool is the right place for this.
792        write_task.abort();
793        command_task.abort();
794        if let Some(registry) = try_client_registry() {
795            registry.unregister(&client_id);
796        }
797
798        let drop_client_id = client_id.clone();
799        tokio::task::spawn_blocking(move || {
800            drop(session); // Drops all subscription guards
801            log::trace!(
802                "Client session subscriptions torn down for {}",
803                drop_client_id
804            );
805        });
806
807        // Delete Client entity
808        if let Err(e) = ctx.del(&client_entity) {
809            log::error!("Failed to delete client entity: {e}");
810        }
811
812        log::info!("Client disconnected: {} from {}", client_id, addr);
813
814        Ok(())
815    }
816
817    /// Handle a parsed MykoMessage.
818    #[allow(clippy::too_many_arguments)]
819    fn handle_message<W: WsWriter>(
820        session: &mut ClientSession<W>,
821        ctx: Arc<CellServerCtx>,
822        priority_tx: &mpsc::Sender<MykoMessage>,
823        drop_logger: &Arc<DropLogger>,
824        query_ids_by_tx: &Arc<Mutex<HashMap<Arc<str>, Arc<str>>>>,
825        view_ids_by_tx: &Arc<Mutex<HashMap<Arc<str>, Arc<str>>>>,
826        subscribe_started_by_tx: &Arc<Mutex<HashMap<Arc<str>, Instant>>>,
827        command_started_by_tx: &Arc<Mutex<HashMap<Arc<str>, Instant>>>,
828        outbound_commands_by_tx: &Arc<Mutex<HashMap<String, (String, Instant)>>>,
829        command_tx: &mpsc::UnboundedSender<CommandJob>,
830        subscribe_tx: &mpsc::UnboundedSender<SubscriptionReady>,
831        msg: MykoMessage,
832    ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
833        // Per-message timing tap. Counts inbound messages by kind into the
834        // global instrumentation. Periodic summarizer logs the deltas.
835        crate::ws_timing::record_inbound(crate::ws_timing::message_kind(&msg));
836
837        let handler_registry = ctx.handler_registry.clone();
838
839        let registry = ctx.registry.clone();
840
841        let host_id = ctx.host_id;
842
843        match msg {
844            MykoMessage::Query(wrapped) => {
845                // Extract tx from the query JSON
846                let tx_id: Arc<str> = wrapped
847                    .query
848                    .get("tx")
849                    .and_then(|v| v.as_str())
850                    .unwrap_or("unknown")
851                    .into();
852                let query_id = &wrapped.query_id;
853                let entity_type = &wrapped.query_item_type;
854
855                // Defensive dedupe: some clients can replay the same subscribe request.
856                // A duplicate for the same tx would reset server-side sequence/window state.
857                if session.has_subscription(&tx_id) {
858                    log::debug!(
859                        "Ignoring duplicate query subscribe client={} tx={} query_id={} item_type={}",
860                        session.client_id,
861                        tx_id,
862                        query_id,
863                        entity_type
864                    );
865                    return Ok(());
866                }
867                if let Ok(mut map) = query_ids_by_tx.lock() {
868                    map.insert(tx_id.clone(), query_id.clone());
869                }
870                if let Ok(mut map) = subscribe_started_by_tx.lock() {
871                    map.entry(tx_id.clone()).or_insert_with(Instant::now);
872                }
873
874                log::trace!("Query {} for {} (tx: {})", query_id, entity_type, tx_id);
875                log::trace!(
876                    "Query subscribe request client={} tx={} query_id={} item_type={} window={} active_subscriptions_before={}",
877                    session.client_id,
878                    tx_id,
879                    query_id,
880                    entity_type,
881                    wrapped.window.is_some(),
882                    session.subscription_count()
883                );
884
885                let request_context = Arc::new(RequestContext::from_client(
886                    tx_id.clone(),
887                    session.client_id.clone(),
888                    host_id,
889                ));
890
891                if let Some(query_data) = handler_registry.get_query(query_id) {
892                    let parsed = (query_data.parse)(wrapped.query.clone());
893                    match parsed {
894                        Ok(any_query) => {
895                            // NOTE(ts): Spawn cell_factory on blocking pool so it doesn't
896                            // block command processing or other messages.
897                            let cell_factory = query_data.cell_factory;
898                            let registry = registry.clone();
899                            let request_context = request_context.clone();
900                            let ctx = ctx.clone();
901                            let window = wrapped.window.clone();
902                            let query_id = query_id.clone();
903                            let sub_tx = subscribe_tx.clone();
904                            tokio::task::spawn_blocking(move || {
905                                match cell_factory(any_query, registry, request_context, Some(ctx))
906                                {
907                                    Ok(filtered_cellmap) => {
908                                        let _ = sub_tx.send(SubscriptionReady::Query {
909                                            tx_id,
910                                            cellmap: filtered_cellmap,
911                                            window,
912                                        });
913                                    }
914                                    Err(e) => {
915                                        log::error!(
916                                            "Failed to create query cell for {}: {}",
917                                            query_id,
918                                            e
919                                        );
920                                    }
921                                }
922                            });
923                        }
924                        Err(e) => {
925                            log::error!(
926                                "Failed to parse query {}: {} | payload: {}",
927                                query_id,
928                                e,
929                                serde_json::to_string(&wrapped.query).unwrap_or_default()
930                            );
931                        }
932                    }
933                } else {
934                    // Fall back to select all for unknown queries
935                    log::warn!(
936                        "No registered query handler for {}, falling back to select all",
937                        query_id
938                    );
939                    let store: myko::store::EntityStore =
940                        (*registry.get_or_create(entity_type)).clone();
941                    let cellmap = hyphae::MapQuery::materialize(store.select(|_| true));
942                    session.subscribe_query(tx_id, cellmap, wrapped.window.clone());
943                }
944            }
945
946            MykoMessage::View(wrapped) => {
947                let tx_id: Arc<str> = wrapped
948                    .view
949                    .get("tx")
950                    .and_then(|v| v.as_str())
951                    .unwrap_or("unknown")
952                    .into();
953                let view_id = &wrapped.view_id;
954                let item_type = &wrapped.view_item_type;
955
956                // Defensive dedupe: ignore repeated subscribe for an already-active tx.
957                if session.has_subscription(&tx_id) {
958                    log::debug!(
959                        "Ignoring duplicate view subscribe client={} tx={} view_id={} item_type={}",
960                        session.client_id,
961                        tx_id,
962                        view_id,
963                        item_type
964                    );
965                    return Ok(());
966                }
967                if let Ok(mut map) = view_ids_by_tx.lock() {
968                    map.insert(tx_id.clone(), view_id.clone());
969                }
970                if let Ok(mut map) = subscribe_started_by_tx.lock() {
971                    map.entry(tx_id.clone()).or_insert_with(Instant::now);
972                }
973
974                log::trace!("View {} for {} (tx: {})", view_id, item_type, tx_id);
975                log::trace!(
976                    "View subscribe request client={} tx={} view_id={} item_type={} window={:?}",
977                    session.client_id,
978                    tx_id,
979                    view_id,
980                    item_type,
981                    wrapped.window
982                );
983
984                let request_context = Arc::new(RequestContext::from_client(
985                    tx_id.clone(),
986                    session.client_id.clone(),
987                    host_id,
988                ));
989
990                if let Some(view_data) = handler_registry.get_view(view_id) {
991                    let parsed = (view_data.parse)(wrapped.view.clone());
992                    match parsed {
993                        Ok(any_view) => {
994                            log::trace!(
995                                "View parsed successfully client={} tx={} view_id={}",
996                                session.client_id,
997                                tx_id,
998                                view_id
999                            );
1000                            // NOTE(ts): Spawn cell_factory on blocking pool so it doesn't
1001                            // block command processing or other messages.
1002                            let cell_factory = view_data.cell_factory;
1003                            let registry = registry.clone();
1004                            let ctx = ctx.clone();
1005                            let window = wrapped.window.clone();
1006                            let view_id_clone = view_id.clone();
1007                            let sub_tx = subscribe_tx.clone();
1008                            let priority_tx = priority_tx.clone();
1009                            let drop_logger = drop_logger.clone();
1010                            tokio::task::spawn_blocking(move || {
1011                                match cell_factory(any_view, registry, request_context, ctx) {
1012                                    Ok(filtered_cellmap) => {
1013                                        log::trace!(
1014                                            "View cell factory succeeded tx={} view_id={}",
1015                                            tx_id,
1016                                            view_id_clone
1017                                        );
1018                                        let _ = sub_tx.send(SubscriptionReady::View {
1019                                            tx_id,
1020                                            view_id: view_id_clone,
1021                                            cellmap: filtered_cellmap,
1022                                            window,
1023                                        });
1024                                    }
1025                                    Err(e) => {
1026                                        log::error!(
1027                                            "Failed to create view cell for {}: {}",
1028                                            view_id_clone,
1029                                            e
1030                                        );
1031                                        if let Err(err) = priority_tx.try_send(
1032                                            MykoMessage::ViewError(ViewError {
1033                                                tx: tx_id.to_string(),
1034                                                view_id: view_id_clone.to_string(),
1035                                                message: e,
1036                                            }),
1037                                        ) {
1038                                            drop_logger.on_drop("ViewError", &err);
1039                                        }
1040                                    }
1041                                }
1042                            });
1043                        }
1044                        Err(e) => {
1045                            let message = format!("Failed to parse view {}: {}", view_id, e);
1046                            log::error!(
1047                                "{} | payload: {}",
1048                                message,
1049                                serde_json::to_string(&wrapped.view).unwrap_or_default()
1050                            );
1051                            if let Err(err) =
1052                                priority_tx.try_send(MykoMessage::ViewError(ViewError {
1053                                    tx: tx_id.to_string(),
1054                                    view_id: view_id.to_string(),
1055                                    message,
1056                                }))
1057                            {
1058                                drop_logger.on_drop("ViewError", &err);
1059                            }
1060                        }
1061                    }
1062                } else {
1063                    let message = format!("No registered handler for view: {}", view_id);
1064                    log::warn!("{}", message);
1065                    if let Err(err) = priority_tx.try_send(MykoMessage::ViewError(ViewError {
1066                        tx: tx_id.to_string(),
1067                        view_id: view_id.to_string(),
1068                        message,
1069                    })) {
1070                        drop_logger.on_drop("ViewError", &err);
1071                    }
1072                }
1073            }
1074
1075            MykoMessage::QueryCancel(CancelSubscription { tx: tx_id }) => {
1076                log::trace!(
1077                    "QueryCancel received: client={} tx={}",
1078                    session.client_id,
1079                    tx_id
1080                );
1081                let tx_id: Arc<str> = tx_id.into();
1082                if let Ok(mut map) = query_ids_by_tx.lock() {
1083                    map.remove(&tx_id);
1084                }
1085                if let Ok(mut map) = subscribe_started_by_tx.lock() {
1086                    map.remove(&tx_id);
1087                }
1088                session.cancel(&tx_id);
1089            }
1090
1091            MykoMessage::QueryWindow(QueryWindowUpdate { tx, window }) => {
1092                let tx_id: Arc<str> = tx.into();
1093                log::trace!(
1094                    "Query window request client={} tx={} has_window={} active_subscriptions={}",
1095                    session.client_id,
1096                    tx_id,
1097                    window.is_some(),
1098                    session.subscription_count()
1099                );
1100                session.update_query_window(&tx_id, window);
1101            }
1102            MykoMessage::ViewCancel(CancelSubscription { tx: tx_id }) => {
1103                log::trace!("View cancel: {}", tx_id);
1104                let tx_id: Arc<str> = tx_id.into();
1105                if let Ok(mut map) = view_ids_by_tx.lock() {
1106                    map.remove(&tx_id);
1107                }
1108                if let Ok(mut map) = subscribe_started_by_tx.lock() {
1109                    map.remove(&tx_id);
1110                }
1111                session.cancel(&tx_id);
1112            }
1113            MykoMessage::ViewWindow(ViewWindowUpdate { tx, window }) => {
1114                let tx_id: Arc<str> = tx.into();
1115                log::trace!("View window update: {}", tx_id);
1116                session.update_view_window(&tx_id, window);
1117            }
1118
1119            MykoMessage::Report(wrapped) => {
1120                // Extract tx from the report JSON
1121                let tx_id: Arc<str> = wrapped
1122                    .report
1123                    .get("tx")
1124                    .and_then(|v| v.as_str())
1125                    .unwrap_or("unknown")
1126                    .into();
1127                let report_id = &wrapped.report_id;
1128
1129                log::trace!(
1130                    "Report subscribe request client={} tx={} report_id={} active_subscriptions_before={}",
1131                    session.client_id,
1132                    tx_id,
1133                    report_id,
1134                    session.subscription_count()
1135                );
1136
1137                // Look up the report registration
1138                if let Some(report_data) = handler_registry.get_report(report_id) {
1139                    // Parse the report JSON to the concrete type
1140                    let parsed = (report_data.parse)(wrapped.report.clone());
1141                    match parsed {
1142                        Ok(any_report) => {
1143                            let request_context = Arc::new(RequestContext::from_client(
1144                                tx_id.clone(),
1145                                session.client_id.clone(),
1146                                host_id,
1147                            ));
1148
1149                            // Create the cell using the factory (with host_id for context)
1150                            match (report_data.cell_factory)(any_report, request_context, ctx) {
1151                                Ok(cell) => {
1152                                    session.subscribe_report(
1153                                        tx_id,
1154                                        report_id.as_str().into(),
1155                                        cell,
1156                                    );
1157                                }
1158                                Err(e) => {
1159                                    log::error!(
1160                                        "Failed to create report cell for {}: {}",
1161                                        report_id,
1162                                        e
1163                                    );
1164                                }
1165                            }
1166                        }
1167                        Err(e) => {
1168                            log::error!(
1169                                "Failed to parse report {}: {} | payload: {}",
1170                                report_id,
1171                                e,
1172                                serde_json::to_string(&wrapped.report).unwrap_or_default()
1173                            );
1174                        }
1175                    }
1176                } else {
1177                    log::warn!("No registered handler for report: {}", report_id);
1178                }
1179            }
1180
1181            MykoMessage::ReportCancel(CancelSubscription { tx: tx_id }) => {
1182                log::trace!(
1183                    "ReportCancel received: client={} tx={} active_subscriptions_before={}",
1184                    session.client_id,
1185                    tx_id,
1186                    session.subscription_count()
1187                );
1188                session.cancel(&tx_id.into());
1189            }
1190
1191            MykoMessage::Event(mut event) => {
1192                record_ws_ingest(std::slice::from_ref(&event));
1193                event.sanitize_null_bytes();
1194                normalize_incoming_event(&mut event, &session.client_id, host_id);
1195                if let Err(e) = ctx.apply_event(event) {
1196                    log::error!(
1197                        "Failed to apply event from client {}: {e}",
1198                        session.client_id
1199                    );
1200                }
1201            }
1202
1203            MykoMessage::EventBatch(mut events) => {
1204                record_ws_ingest(&events);
1205                let incoming = events.len();
1206                if incoming >= 64 {
1207                    log::trace!(
1208                        "Received event batch from client {} size={}",
1209                        session.client_id,
1210                        incoming
1211                    );
1212                }
1213                for event in &mut events {
1214                    event.sanitize_null_bytes();
1215                    normalize_incoming_event(event, &session.client_id, host_id);
1216                }
1217                match ctx.apply_event_batch(events) {
1218                    Ok(applied) => {
1219                        log::trace!(
1220                            "Applied event batch from client {} size={}",
1221                            session.client_id,
1222                            applied
1223                        );
1224                    }
1225                    Err(e) => {
1226                        log::error!(
1227                            "Failed to apply event batch from client {}: {}",
1228                            session.client_id,
1229                            e
1230                        );
1231                    }
1232                }
1233            }
1234
1235            MykoMessage::Command(wrapped) => {
1236                // Extract tx from the command JSON
1237                let tx_id: Arc<str> = wrapped
1238                    .command
1239                    .get("tx")
1240                    .and_then(|v| v.as_str())
1241                    .unwrap_or("unknown")
1242                    .into();
1243
1244                let command_id = &wrapped.command_id;
1245
1246                log::trace!("Command {} (tx: {})", command_id, tx_id,);
1247                let received_at = Instant::now();
1248                if let Ok(mut map) = command_started_by_tx.lock() {
1249                    map.insert(tx_id.clone(), received_at);
1250                }
1251                if let Err(e) = command_tx.send(CommandJob {
1252                    tx_id: tx_id.clone(),
1253                    command_id: wrapped.command_id.clone(),
1254                    command: wrapped.command.clone(),
1255                    received_at,
1256                }) {
1257                    log::error!(
1258                        "Failed to enqueue command {} for client {} tx={}: {}",
1259                        command_id,
1260                        session.client_id,
1261                        tx_id,
1262                        e
1263                    );
1264                    let error = MykoMessage::CommandError(CommandError {
1265                        tx: tx_id.to_string(),
1266                        command_id: command_id.to_string(),
1267                        message: "Command queue unavailable".to_string(),
1268                    });
1269                    if let Err(err) = priority_tx.try_send(error) {
1270                        drop_logger.on_drop("CommandError", &err);
1271                    }
1272                }
1273            }
1274
1275            MykoMessage::Ping(PingData { id, timestamp }) => {
1276                // Echo back the ping data
1277                let pong = MykoMessage::Ping(PingData { id, timestamp });
1278                if let Err(e) = priority_tx.try_send(pong) {
1279                    drop_logger.on_drop("Ping", &e);
1280                }
1281            }
1282
1283            // Response messages - these shouldn't come from clients.
1284            MykoMessage::QueryResponse(resp) => {
1285                log::warn!(
1286                    "Unexpected client message kind=query_response client={} tx={} seq={} upserts={} deletes={} active_subscriptions={}",
1287                    session.client_id,
1288                    resp.tx,
1289                    resp.sequence,
1290                    resp.upserts.len(),
1291                    resp.deletes.len(),
1292                    session.subscription_count()
1293                );
1294            }
1295            MykoMessage::QueryError(err) => {
1296                log::warn!(
1297                    "Unexpected client message kind=query_error client={} tx={} query_id={} message={} active_subscriptions={}",
1298                    session.client_id,
1299                    err.tx,
1300                    err.query_id,
1301                    err.message,
1302                    session.subscription_count()
1303                );
1304            }
1305            MykoMessage::ViewResponse(resp) => {
1306                log::warn!(
1307                    "Unexpected client message kind=view_response client={} tx={} seq={} upserts={} deletes={} active_subscriptions={}",
1308                    session.client_id,
1309                    resp.tx,
1310                    resp.sequence,
1311                    resp.upserts.len(),
1312                    resp.deletes.len(),
1313                    session.subscription_count()
1314                );
1315            }
1316            MykoMessage::ViewError(err) => {
1317                log::warn!(
1318                    "Unexpected client message kind=view_error client={} tx={} view_id={} message={} active_subscriptions={}",
1319                    session.client_id,
1320                    err.tx,
1321                    err.view_id,
1322                    err.message,
1323                    session.subscription_count()
1324                );
1325            }
1326            MykoMessage::ReportResponse(resp) => {
1327                log::warn!(
1328                    "Unexpected client message kind=report_response client={} tx={} active_subscriptions={}",
1329                    session.client_id,
1330                    resp.tx,
1331                    session.subscription_count()
1332                );
1333            }
1334            MykoMessage::ReportError(err) => {
1335                log::warn!(
1336                    "Unexpected client message kind=report_error client={} tx={} report_id={} message={} active_subscriptions={}",
1337                    session.client_id,
1338                    err.tx,
1339                    err.report_id,
1340                    err.message,
1341                    session.subscription_count()
1342                );
1343            }
1344            MykoMessage::CommandResponse(resp) => {
1345                if resp.tx.trim().is_empty() {
1346                    log::warn!(
1347                        "Malformed client message kind=command_response client={} tx=<empty> active_subscriptions={}",
1348                        session.client_id,
1349                        session.subscription_count()
1350                    );
1351                } else {
1352                    let correlated = outbound_commands_by_tx
1353                        .lock()
1354                        .ok()
1355                        .and_then(|mut map| map.remove(&resp.tx));
1356                    if let Some((command_id, started)) = correlated {
1357                        log::debug!(
1358                            "Client command response matched outbound command client={} tx={} command_id={} roundtrip_ms={} active_subscriptions={}",
1359                            session.client_id,
1360                            resp.tx,
1361                            command_id,
1362                            started.elapsed().as_millis(),
1363                            session.subscription_count()
1364                        );
1365                    } else {
1366                        log::warn!(
1367                            "Client command response without outbound match client={} tx={} active_subscriptions={}",
1368                            session.client_id,
1369                            resp.tx,
1370                            session.subscription_count()
1371                        );
1372                    }
1373                }
1374            }
1375            MykoMessage::CommandError(err) => {
1376                if err.tx.trim().is_empty() {
1377                    log::warn!(
1378                        "Malformed client message kind=command_error client={} tx=<empty> command_id={} message={} active_subscriptions={}",
1379                        session.client_id,
1380                        err.command_id,
1381                        err.message,
1382                        session.subscription_count()
1383                    );
1384                } else {
1385                    let correlated = outbound_commands_by_tx
1386                        .lock()
1387                        .ok()
1388                        .and_then(|mut map| map.remove(&err.tx));
1389                    if let Some((command_id, started)) = correlated {
1390                        log::warn!(
1391                            "Client command error matched outbound command client={} tx={} command_id={} transport_command_id={} message={} roundtrip_ms={} active_subscriptions={}",
1392                            session.client_id,
1393                            err.tx,
1394                            err.command_id,
1395                            command_id,
1396                            err.message,
1397                            started.elapsed().as_millis(),
1398                            session.subscription_count()
1399                        );
1400                    } else {
1401                        log::warn!(
1402                            "Client command error without outbound match client={} tx={} command_id={} message={} active_subscriptions={}",
1403                            session.client_id,
1404                            err.tx,
1405                            err.command_id,
1406                            err.message,
1407                            session.subscription_count()
1408                        );
1409                    }
1410                }
1411            }
1412            MykoMessage::Benchmark(payload) => {
1413                let stats = ws_benchmark_stats();
1414                ensure_ws_benchmark_logger();
1415                stats.message_count.fetch_add(1, Ordering::Relaxed);
1416                // Estimate payload size from the JSON value
1417                let size = payload.to_string().len() as u64;
1418                stats.total_bytes.fetch_add(size, Ordering::Relaxed);
1419            }
1420        }
1421
1422        Ok(())
1423    }
1424
1425    fn execute_command_job(
1426        ctx: Arc<CellServerCtx>,
1427        priority_tx: &mpsc::Sender<MykoMessage>,
1428        drop_logger: &DropLogger,
1429        client_id: Arc<str>,
1430        job: CommandJob,
1431    ) {
1432        let host_id = ctx.host_id;
1433        let started = Instant::now();
1434        let queue_wait_ms = started.duration_since(job.received_at).as_millis();
1435        let command_id = job.command_id.clone();
1436
1437        let mut handler_found = false;
1438        for registration in inventory::iter::<CommandHandlerRegistration> {
1439            if registration.command_id == command_id {
1440                handler_found = true;
1441                let executor = (registration.factory)();
1442
1443                let req = Arc::new(RequestContext::from_client(
1444                    job.tx_id.clone(),
1445                    client_id.clone(),
1446                    host_id,
1447                ));
1448                let cmd_id: Arc<str> = Arc::from(command_id.clone());
1449                let cmd_ctx = CommandContext::new(cmd_id, req, ctx.clone());
1450                let execute_started = Instant::now();
1451
1452                match executor.execute_from_value(job.command.clone(), cmd_ctx) {
1453                    Ok(result) => {
1454                        let response = MykoMessage::CommandResponse(CommandResponse {
1455                            response: result,
1456                            tx: job.tx_id.to_string(),
1457                        });
1458                        if let Err(e) = priority_tx.try_send(response) {
1459                            drop_logger.on_drop("CommandResponse", &e);
1460                        }
1461                    }
1462                    Err(e) => {
1463                        let error = MykoMessage::CommandError(CommandError {
1464                            tx: job.tx_id.to_string(),
1465                            command_id: command_id.clone(),
1466                            message: e.message,
1467                        });
1468                        if let Err(err) = priority_tx.try_send(error) {
1469                            drop_logger.on_drop("CommandError", &err);
1470                        }
1471                    }
1472                }
1473                let execute_ms = execute_started.elapsed().as_millis();
1474                let total_ms = job.received_at.elapsed().as_millis();
1475                log::trace!(
1476                    target: "myko_server::ws_perf",
1477                    "command_exec client={} tx={} command_id={} queue_wait_ms={} execute_ms={} total_ms={}",
1478                    client_id,
1479                    job.tx_id,
1480                    command_id,
1481                    queue_wait_ms,
1482                    execute_ms,
1483                    total_ms
1484                );
1485                break;
1486            }
1487        }
1488
1489        if !handler_found {
1490            log::warn!("No registered handler for command: {}", command_id);
1491            let error = MykoMessage::CommandError(CommandError {
1492                tx: job.tx_id.to_string(),
1493                command_id: command_id.clone(),
1494                message: format!("Command handler not found: {}", command_id),
1495            });
1496            if let Err(e) = priority_tx.try_send(error) {
1497                drop_logger.on_drop("CommandError", &e);
1498            }
1499        }
1500
1501        if !handler_found {
1502            log::debug!(
1503                target: "myko_server::ws_perf",
1504                "command_exec client={} tx={} command_id={} queue_wait_ms={} execute_ms=0 total_ms={} handler_found=false",
1505                client_id,
1506                job.tx_id,
1507                command_id,
1508                queue_wait_ms,
1509                job.received_at.elapsed().as_millis()
1510            );
1511        }
1512    }
1513}
1514
1515/// Channel-based WebSocket writer.
1516///
1517/// Sends messages through an mpsc channel which are then
1518/// forwarded to the actual WebSocket.
1519struct ChannelWriter {
1520    tx: mpsc::Sender<OutboundMessage>,
1521    deferred_tx: mpsc::Sender<DeferredOutbound>,
1522    drop_logger: Arc<DropLogger>,
1523    outgoing_format: Arc<AtomicU8>,
1524}
1525
1526impl ChannelWriter {
1527    /// Cheap "is the writer task gone?" check used to short-circuit
1528    /// subscriber callbacks for a disconnected client. `Sender::is_closed`
1529    /// returns true once the matching receiver is dropped, which happens as
1530    /// soon as the write task exits. Avoiding the work here prevents the
1531    /// "buffer full / channel closed" log storm we used to see for 10+
1532    /// seconds after every disconnect while the session was still tearing
1533    /// down its subscription guards.
1534    #[inline]
1535    fn tx_dead(&self) -> bool {
1536        self.tx.is_closed()
1537    }
1538
1539    #[inline]
1540    fn deferred_dead(&self) -> bool {
1541        self.deferred_tx.is_closed()
1542    }
1543}
1544
1545impl WsWriter for ChannelWriter {
1546    fn send(&self, msg: MykoMessage) {
1547        // Fast path: writer is gone. Don't try to send and don't log; the
1548        // dead-channel state is expected after the client disconnects, and
1549        // a dropped subscription will follow shortly when the session
1550        // teardown finishes. Avoiding the log+counter prevents the
1551        // "buffer full / channel closed" warning storm we used to see for
1552        // 10+ seconds after every disconnect.
1553        if self.tx_dead() {
1554            return;
1555        }
1556        if let Err(e) = self.tx.try_send(OutboundMessage::Message(msg)) {
1557            // Closed errors here race with the writer task exiting between
1558            // the is_dead check and the try_send; suppress them too.
1559            if !matches!(e, mpsc::error::TrySendError::Closed(_)) {
1560                self.drop_logger.on_drop("message", &e);
1561            }
1562        }
1563    }
1564
1565    fn protocol(&self) -> MykoProtocol {
1566        MykoProtocol::from(self.outgoing_format.load(Ordering::SeqCst))
1567    }
1568
1569    fn send_serialized_command(
1570        &self,
1571        tx: Arc<str>,
1572        command_id: String,
1573        payload: EncodedCommandMessage,
1574    ) {
1575        if self.tx_dead() {
1576            return;
1577        }
1578        if let Err(e) = self.tx.try_send(OutboundMessage::SerializedCommand {
1579            tx,
1580            command_id,
1581            payload,
1582        }) && !matches!(e, mpsc::error::TrySendError::Closed(_))
1583        {
1584            self.drop_logger.on_drop("serialized_command", &e);
1585        }
1586    }
1587
1588    fn send_report_response(&self, tx: Arc<str>, output: Arc<dyn AnyOutput>) {
1589        if self.deferred_dead() {
1590            return;
1591        }
1592        if let Err(e) = self
1593            .deferred_tx
1594            .try_send(DeferredOutbound::Report(tx, output))
1595            && !matches!(e, mpsc::error::TrySendError::Closed(_))
1596        {
1597            self.drop_logger.on_drop("ReportResponseDeferred", &e);
1598        }
1599    }
1600
1601    fn send_query_response(&self, response: PendingQueryResponse, is_view: bool) {
1602        if self.deferred_dead() {
1603            return;
1604        }
1605        if let Err(e) = self
1606            .deferred_tx
1607            .try_send(DeferredOutbound::Query { response, is_view })
1608            && !matches!(e, mpsc::error::TrySendError::Closed(_))
1609        {
1610            self.drop_logger.on_drop("QueryResponseDeferred", &e);
1611        }
1612    }
1613}
1614
1615#[cfg(test)]
1616mod tests {
1617    use super::*;
1618
1619    #[test]
1620    fn test_channel_writer() {
1621        let (tx, mut rx) = mpsc::channel(10);
1622        let (deferred_tx, _deferred_rx) = mpsc::channel(10);
1623        let drop_logger = Arc::new(DropLogger::new("test-client".into()));
1624        let writer = ChannelWriter {
1625            tx,
1626            deferred_tx,
1627            drop_logger,
1628            outgoing_format: Arc::new(AtomicU8::new(MykoProtocol::JSON as u8)),
1629        };
1630
1631        let msg = MykoMessage::Ping(PingData {
1632            id: "test".to_string(),
1633            timestamp: 0,
1634        });
1635        writer.send(msg);
1636
1637        let received = rx.try_recv().unwrap();
1638        assert!(matches!(
1639            received,
1640            OutboundMessage::Message(MykoMessage::Ping(_))
1641        ));
1642    }
1643
1644    #[test]
1645    fn outgoing_format_starts_as_json_and_promotes_to_cbor() {
1646        use std::sync::atomic::{AtomicU8, Ordering};
1647
1648        let outgoing_format = AtomicU8::new(MykoProtocol::JSON as u8);
1649
1650        // Initially JSON.
1651        assert_eq!(
1652            MykoProtocol::from(outgoing_format.load(Ordering::SeqCst)),
1653            MykoProtocol::JSON,
1654        );
1655
1656        // Simulate receiving a binary frame: promote.
1657        outgoing_format.store(MykoProtocol::CBOR as u8, Ordering::SeqCst);
1658        assert_eq!(
1659            MykoProtocol::from(outgoing_format.load(Ordering::SeqCst)),
1660            MykoProtocol::CBOR,
1661        );
1662
1663        // Simulate receiving more text frames after promotion: no change.
1664        // (The handler in the read loop only writes on Binary, never on Text,
1665        // so this is a no-op assertion that the field's last-write-wins
1666        // semantics give us stickiness for free.)
1667        assert_eq!(
1668            MykoProtocol::from(outgoing_format.load(Ordering::SeqCst)),
1669            MykoProtocol::CBOR,
1670        );
1671    }
1672}