Skip to main content

myko_server/
ws_handler.rs

1//! WebSocket handler for the cell-based server.
2//!
3//! Handles WebSocket connections using ClientSession for subscription management.
4
5use std::{
6    collections::HashMap,
7    net::SocketAddr,
8    sync::{
9        Arc, Mutex, OnceLock,
10        atomic::{AtomicBool, AtomicU64, Ordering},
11    },
12    thread,
13    time::{Duration, Instant},
14};
15
16use dashmap::DashMap;
17use futures_util::{SinkExt, StreamExt};
18use hyphae::SelectExt;
19use myko::{
20    WS_MAX_FRAME_SIZE_BYTES, WS_MAX_MESSAGE_SIZE_BYTES,
21    command::{CommandContext, CommandHandlerRegistration},
22    entities::client::{Client, ClientId},
23    relationship::{
24        iter_client_id_registrations, iter_fallback_to_id_registrations,
25        iter_server_owned_registrations,
26    },
27    report::AnyOutput,
28    request::RequestContext,
29    server::{
30        CellServerCtx, ClientSession, PendingQueryResponse, WsWriter,
31        client_registry::try_client_registry,
32    },
33    wire::{
34        CancelSubscription, CommandError, CommandResponse, EncodedCommandMessage, MEvent,
35        MEventType, MykoMessage, PingData, QueryWindowUpdate, ViewError, ViewWindowUpdate,
36    },
37};
38use tokio::{net::TcpStream, sync::mpsc, time::interval};
39use tokio_tungstenite::{
40    accept_async_with_config,
41    tungstenite::{Message, protocol::WebSocketConfig},
42};
43use uuid::Uuid;
44
45/// Protocol switch message sent by client to enable binary (msgpack) encoding.
46/// Must match ProtocolMessages.SwitchToMSGPACK in TypeScript client.
47const SWITCH_TO_MSGPACK: &str = "myko:switch-to-msgpack";
48
49struct WsIngestStats {
50    counts_by_type: DashMap<Arc<str>, Arc<AtomicU64>>,
51}
52
53struct WsBenchmarkStats {
54    message_count: AtomicU64,
55    total_bytes: AtomicU64,
56}
57
58static WS_INGEST_STATS: OnceLock<Arc<WsIngestStats>> = OnceLock::new();
59static WS_INGEST_LOGGER_STARTED: AtomicBool = AtomicBool::new(false);
60static WS_BENCHMARK_STATS: OnceLock<Arc<WsBenchmarkStats>> = OnceLock::new();
61static WS_BENCHMARK_LOGGER_STARTED: AtomicBool = AtomicBool::new(false);
62
63fn ws_benchmark_stats() -> Arc<WsBenchmarkStats> {
64    WS_BENCHMARK_STATS
65        .get_or_init(|| {
66            Arc::new(WsBenchmarkStats {
67                message_count: AtomicU64::new(0),
68                total_bytes: AtomicU64::new(0),
69            })
70        })
71        .clone()
72}
73
74fn ensure_ws_benchmark_logger() {
75    if WS_BENCHMARK_LOGGER_STARTED
76        .compare_exchange(false, true, Ordering::Relaxed, Ordering::Relaxed)
77        .is_err()
78    {
79        return;
80    }
81
82    let stats = ws_benchmark_stats();
83    thread::Builder::new()
84        .name("ws-benchmark-logger".into())
85        .spawn(move || {
86            loop {
87                thread::sleep(Duration::from_secs(1));
88
89                let count = stats.message_count.swap(0, Ordering::Relaxed);
90                let bytes = stats.total_bytes.swap(0, Ordering::Relaxed);
91
92                if count == 0 {
93                    continue;
94                }
95
96                log::info!(
97                    "WebSocket benchmark last_1s messages={} bytes={} avg_bytes={}",
98                    count,
99                    bytes,
100                    bytes / count
101                );
102            }
103        })
104        .expect("failed to spawn websocket benchmark logger thread");
105}
106
107fn ws_ingest_stats() -> Arc<WsIngestStats> {
108    WS_INGEST_STATS
109        .get_or_init(|| {
110            Arc::new(WsIngestStats {
111                counts_by_type: DashMap::new(),
112            })
113        })
114        .clone()
115}
116
117fn ensure_ws_ingest_logger() {
118    if WS_INGEST_LOGGER_STARTED
119        .compare_exchange(false, true, Ordering::Relaxed, Ordering::Relaxed)
120        .is_err()
121    {
122        return;
123    }
124
125    let stats = ws_ingest_stats();
126    thread::Builder::new()
127        .name("ws-ingest-logger".into())
128        .spawn(move || {
129            loop {
130                thread::sleep(Duration::from_secs(1));
131
132                let mut per_type = Vec::new();
133                let mut total = 0u64;
134                for entry in &stats.counts_by_type {
135                    let count = entry.value().swap(0, Ordering::Relaxed);
136                    if count == 0 {
137                        continue;
138                    }
139                    total += count;
140                    per_type.push((entry.key().clone(), count));
141                }
142
143                if total == 0 {
144                    continue;
145                }
146
147                per_type.sort_by(|a, b| b.1.cmp(&a.1).then_with(|| a.0.cmp(&b.0)));
148                let details = per_type
149                    .into_iter()
150                    .map(|(entity_type, count)| format!("{entity_type}={count}"))
151                    .collect::<Vec<_>>()
152                    .join(" ");
153
154                log::info!("WebSocket ingest last_1s total={} {}", total, details);
155            }
156        })
157        .expect("failed to spawn websocket ingest logger thread");
158}
159
160fn record_ws_ingest(events: &[MEvent]) {
161    if events.is_empty() {
162        return;
163    }
164
165    let stats = ws_ingest_stats();
166    for event in events {
167        let counter = stats
168            .counts_by_type
169            .entry(Arc::<str>::from(event.item_type.as_str()))
170            .or_insert_with(|| Arc::new(AtomicU64::new(0)))
171            .clone();
172        counter.fetch_add(1, Ordering::Relaxed);
173    }
174}
175
176fn normalize_incoming_event(event: &mut MEvent, client_id: &str, host_id: uuid::Uuid) {
177    if event.change_type != MEventType::SET {
178        return;
179    }
180
181    // Auto-populate #[myko_client_id] fields with the connection's client_id
182    for reg in iter_client_id_registrations() {
183        if reg.entity_type == event.item_type {
184            if let Some(obj) = event.item.as_object_mut() {
185                obj.insert(
186                    reg.field_name_json.to_string(),
187                    serde_json::Value::String(client_id.to_string()),
188                );
189            }
190            break;
191        }
192    }
193
194    // Auto-populate #[server_owned] fields with this server's ID
195    for reg in iter_server_owned_registrations() {
196        if reg.entity_type == event.item_type {
197            if let Some(obj) = event.item.as_object_mut() {
198                let field = reg.field_name_json;
199                let current = obj.get(field).and_then(|v| v.as_str()).unwrap_or("");
200                if current.is_empty() {
201                    obj.insert(
202                        field.to_string(),
203                        serde_json::Value::String(host_id.to_string()),
204                    );
205                }
206            }
207            break;
208        }
209    }
210
211    // Auto-populate #[fallback_to_id] fields with the entity's own id
212    // if the field is null or missing.
213    if let Some(obj) = event.item.as_object_mut()
214        && let Some(id) = obj.get("id").and_then(|v| v.as_str()).map(String::from)
215    {
216        for reg in iter_fallback_to_id_registrations() {
217            if reg.entity_type == event.item_type {
218                let field = reg.field_name_json;
219                if matches!(obj.get(field), None | Some(serde_json::Value::Null)) {
220                    obj.insert(field.to_string(), serde_json::Value::String(id.clone()));
221                }
222            }
223        }
224    }
225}
226
227/// Per-connection drop tracking to avoid log storms when clients fall behind.
228///
229/// When the outbound channel is full, we will drop messages (same as today),
230/// but we must not `warn!` for every drop or we can effectively DoS ourselves.
231struct DropLogger {
232    client_id: Arc<str>,
233    dropped: std::sync::atomic::AtomicU64,
234    last_log_ms: std::sync::atomic::AtomicU64,
235}
236
237impl DropLogger {
238    fn new(client_id: Arc<str>) -> Self {
239        Self {
240            client_id,
241            dropped: std::sync::atomic::AtomicU64::new(0),
242            last_log_ms: std::sync::atomic::AtomicU64::new(0),
243        }
244    }
245
246    fn on_drop(&self, kind: &'static str, err: &dyn std::fmt::Display) {
247        use std::sync::atomic::Ordering;
248
249        self.dropped.fetch_add(1, Ordering::Relaxed);
250
251        // Log at most once per second per connection.
252        let now_ms = std::time::SystemTime::now()
253            .duration_since(std::time::UNIX_EPOCH)
254            .unwrap_or_default()
255            .as_millis() as u64;
256        let last_ms = self.last_log_ms.load(Ordering::Relaxed);
257        if now_ms.saturating_sub(last_ms) < 1000 {
258            return;
259        }
260
261        if self
262            .last_log_ms
263            .compare_exchange(last_ms, now_ms, Ordering::Relaxed, Ordering::Relaxed)
264            .is_err()
265        {
266            return;
267        }
268
269        let n = self.dropped.swap(0, Ordering::Relaxed);
270        log::warn!(
271            "WebSocket send buffer full; dropped {} message(s) for client {} (latest: {}): {}",
272            n,
273            self.client_id,
274            kind,
275            err
276        );
277    }
278}
279
280struct CommandJob {
281    tx_id: Arc<str>,
282    command_id: String,
283    command: serde_json::Value,
284    received_at: Instant,
285}
286
287/// Result of an async subscription build (query or view cell_factory).
288enum SubscriptionReady {
289    Query {
290        tx_id: Arc<str>,
291        cellmap: hyphae::CellMap<Arc<str>, Arc<dyn myko::item::AnyItem>, hyphae::CellImmutable>,
292        window: Option<myko::wire::QueryWindow>,
293    },
294    View {
295        tx_id: Arc<str>,
296        view_id: Arc<str>,
297        cellmap: hyphae::CellMap<Arc<str>, Arc<dyn myko::item::AnyItem>, hyphae::CellImmutable>,
298        window: Option<myko::wire::QueryWindow>,
299    },
300}
301
302enum OutboundMessage {
303    Message(MykoMessage),
304    SerializedCommand {
305        tx: Arc<str>,
306        command_id: String,
307        payload: EncodedCommandMessage,
308    },
309}
310
311enum DeferredOutbound {
312    Report(Arc<str>, Arc<dyn AnyOutput>),
313    Query {
314        response: PendingQueryResponse,
315        is_view: bool,
316    },
317}
318
319/// WebSocket handler for a single client connection.
320pub struct WsHandler;
321
322impl WsHandler {
323    /// Handle a new WebSocket connection.
324    #[allow(clippy::too_many_arguments)]
325    pub async fn handle_connection(
326        stream: TcpStream,
327        addr: SocketAddr,
328        ctx: Arc<CellServerCtx>,
329    ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
330        ensure_ws_ingest_logger();
331
332        let host_id = ctx.host_id;
333
334        let mut ws_config = WebSocketConfig::default();
335        ws_config.max_message_size = Some(WS_MAX_MESSAGE_SIZE_BYTES);
336        ws_config.max_frame_size = Some(WS_MAX_FRAME_SIZE_BYTES);
337        let ws_stream = accept_async_with_config(stream, Some(ws_config)).await?;
338        let (mut write, mut read) = ws_stream.split();
339
340        // Create a bounded channel for sending messages to the client
341        // High limit (10k) since we have good memory availability
342        let (tx, mut rx) = mpsc::channel::<OutboundMessage>(10_000);
343        let (deferred_tx, mut deferred_rx) = mpsc::channel::<DeferredOutbound>(10_000);
344        let (priority_tx, mut priority_rx) = mpsc::channel::<MykoMessage>(1_000);
345        let (command_tx, mut command_rx) = mpsc::unbounded_channel::<CommandJob>();
346        let (subscribe_tx, mut subscribe_rx) = mpsc::unbounded_channel::<SubscriptionReady>();
347
348        // Protocol: default to JSON, switch to binary only if client opts in
349        let use_binary = Arc::new(AtomicBool::new(false));
350
351        // Create client session with channel-based writer
352        let client_id: Arc<str> = Uuid::new_v4().to_string().into();
353        let drop_logger = Arc::new(DropLogger::new(client_id.clone()));
354        let writer = ChannelWriter {
355            tx: tx.clone(),
356            deferred_tx: deferred_tx.clone(),
357            drop_logger: drop_logger.clone(),
358            use_binary_writer: use_binary.clone(),
359        };
360
361        // Register writer in the global client registry (if initialized)
362        let writer_arc: Arc<dyn WsWriter> = Arc::new(ChannelWriter {
363            tx: tx.clone(),
364            deferred_tx: deferred_tx.clone(),
365            drop_logger: drop_logger.clone(),
366            use_binary_writer: use_binary.clone(),
367        });
368        if let Some(registry) = try_client_registry() {
369            registry.register(client_id.clone(), writer_arc);
370        }
371
372        let mut session = ClientSession::new(client_id.clone(), writer);
373
374        let use_binary_writer = use_binary.clone();
375        let query_ids_by_tx: Arc<Mutex<HashMap<Arc<str>, Arc<str>>>> =
376            Arc::new(Mutex::new(HashMap::new()));
377        let view_ids_by_tx: Arc<Mutex<HashMap<Arc<str>, Arc<str>>>> =
378            Arc::new(Mutex::new(HashMap::new()));
379        let subscribe_started_by_tx: Arc<Mutex<HashMap<Arc<str>, Instant>>> =
380            Arc::new(Mutex::new(HashMap::new()));
381        let command_started_by_tx: Arc<Mutex<HashMap<Arc<str>, Instant>>> =
382            Arc::new(Mutex::new(HashMap::new()));
383        let outbound_commands_by_tx: Arc<Mutex<HashMap<String, (String, Instant)>>> =
384            Arc::new(Mutex::new(HashMap::new()));
385
386        let outbound_commands_by_tx_writer = outbound_commands_by_tx.clone();
387
388        // Publish Client entity
389        let client_entity = Client {
390            id: ClientId(client_id.clone()),
391            server_id: host_id.to_string().into(),
392            address: Some(Arc::from(addr.to_string())),
393            windback: None,
394        };
395        if let Err(e) = ctx.set(&client_entity) {
396            log::error!("Failed to persist client entity: {e}");
397        }
398
399        log::info!("Client connected: {} from {}", client_id, addr);
400
401        let write_ctx = ctx.clone();
402        let write_client_id = client_id.clone();
403        let write_addr = addr;
404        let command_ctx = ctx.clone();
405        let command_priority_tx = priority_tx.clone();
406        let command_drop_logger = drop_logger.clone();
407        let command_client_id = client_id.clone();
408
409        // Spawn task to forward messages from channel to WebSocket
410        let write_task = tokio::spawn(async move {
411            let _ctx = write_ctx;
412            let mut normal_open = true;
413            let mut priority_open = true;
414            let mut deferred_open = true;
415            while normal_open || priority_open || deferred_open {
416                let msg = tokio::select! {
417                    biased;
418                    maybe = priority_rx.recv(), if priority_open => {
419                        match maybe {
420                            Some(msg) => OutboundMessage::Message(msg),
421                            None => {
422                                priority_open = false;
423                                continue;
424                            }
425                        }
426                    }
427                    maybe = deferred_rx.recv(), if deferred_open => {
428                        match maybe {
429                            Some(DeferredOutbound::Report(tx, output)) => {
430                                OutboundMessage::Message(MykoMessage::ReportResponse(myko::wire::ReportResponse {
431                                    response: output.to_value(),
432                                    tx: tx.to_string(),
433                                }))
434                            }
435                            Some(DeferredOutbound::Query { response, is_view }) => {
436                                if is_view {
437                                    OutboundMessage::Message(MykoMessage::ViewResponse(response.into_wire()))
438                                } else {
439                                    OutboundMessage::Message(MykoMessage::QueryResponse(response.into_wire()))
440                                }
441                            }
442                            None => {
443                                deferred_open = false;
444                                continue;
445                            }
446                        }
447                    }
448                    maybe = rx.recv(), if normal_open => {
449                        match maybe {
450                            Some(msg) => msg,
451                            None => {
452                                normal_open = false;
453                                continue;
454                            }
455                        }
456                    }
457                };
458                let (kind, tx_id, seq, _upserts, _deletes, _total_count) = match &msg {
459                    OutboundMessage::SerializedCommand { tx, .. } => {
460                        ("command", Some(tx.clone()), None, None, None, None)
461                    }
462                    OutboundMessage::Message(msg) => match msg {
463                        MykoMessage::ViewResponse(r) => (
464                            "view_response",
465                            Some(r.tx.clone()),
466                            Some(r.sequence),
467                            Some(r.upserts.len()),
468                            Some(r.deletes.len()),
469                            r.total_count,
470                        ),
471                        MykoMessage::QueryResponse(r) => (
472                            "query_response",
473                            Some(r.tx.clone()),
474                            Some(r.sequence),
475                            Some(r.upserts.len()),
476                            Some(r.deletes.len()),
477                            r.total_count,
478                        ),
479                        MykoMessage::CommandResponse(r) => (
480                            "command_response",
481                            Some(Arc::<str>::from(r.tx.clone())),
482                            None,
483                            None,
484                            None,
485                            None,
486                        ),
487                        MykoMessage::CommandError(r) => (
488                            "command_error",
489                            Some(Arc::<str>::from(r.tx.clone())),
490                            None,
491                            None,
492                            None,
493                            None,
494                        ),
495                        _ => ("other", None, None, None, None, None),
496                    },
497                };
498
499                match &msg {
500                    OutboundMessage::SerializedCommand { tx, command_id, .. } => {
501                        if !tx.trim().is_empty()
502                            && let Ok(mut map) = outbound_commands_by_tx_writer.lock()
503                        {
504                            map.insert(tx.to_string(), (command_id.clone(), Instant::now()));
505                        }
506                    }
507                    OutboundMessage::Message(MykoMessage::Command(wrapped)) => {
508                        if let Some(tx_id) = wrapped.command.get("tx").and_then(|v| v.as_str())
509                            && !tx_id.trim().is_empty()
510                            && let Ok(mut map) = outbound_commands_by_tx_writer.lock()
511                        {
512                            map.insert(
513                                tx_id.to_string(),
514                                (wrapped.command_id.clone(), Instant::now()),
515                            );
516                        }
517                    }
518                    _ => {}
519                }
520
521                let ws_msg = match &msg {
522                    OutboundMessage::SerializedCommand {
523                        payload: EncodedCommandMessage::Json(json),
524                        ..
525                    } => Message::Text(json.clone().into()),
526                    OutboundMessage::SerializedCommand {
527                        payload: EncodedCommandMessage::Msgpack(bytes),
528                        ..
529                    } => Message::Binary(bytes.clone().into()),
530                    OutboundMessage::Message(msg) if use_binary_writer.load(Ordering::SeqCst) => {
531                        match rmp_serde::to_vec(msg) {
532                            Ok(bytes) => Message::Binary(bytes.into()),
533                            Err(e) => {
534                                log::error!("Failed to serialize message to msgpack: {}", e);
535                                continue;
536                            }
537                        }
538                    }
539                    OutboundMessage::Message(msg) => match serde_json::to_string(msg) {
540                        Ok(json) => Message::Text(json.into()),
541                        Err(e) => {
542                            log::error!("Failed to serialize message to JSON: {}", e);
543                            continue;
544                        }
545                    },
546                };
547                let payload_bytes = match &ws_msg {
548                    Message::Binary(b) => b.len(),
549                    Message::Text(t) => t.len(),
550                    _ => 0,
551                };
552
553                if let Err(err) = write.send(ws_msg).await {
554                    log::error!(
555                        "WebSocket write failed for client {} from {} kind={} tx={:?} seq={:?} payload_bytes={} binary={}: {}",
556                        write_client_id,
557                        write_addr,
558                        kind,
559                        tx_id,
560                        seq,
561                        payload_bytes,
562                        use_binary_writer.load(Ordering::SeqCst),
563                        err
564                    );
565                    break;
566                }
567            }
568            // NOTE(ts): Unregister from client registry immediately so the node
569            // executor stops serializing commands into a dead channel.
570            if let Some(registry) = try_client_registry() {
571                registry.unregister(&write_client_id);
572                log::info!(
573                    "WebSocket writer unregistered client {} from {} (write task exiting)",
574                    write_client_id,
575                    write_addr,
576                );
577            }
578            log::warn!(
579                "WebSocket writer task exiting for client {} from {} normal_open={} priority_open={} deferred_open={}",
580                write_client_id,
581                write_addr,
582                normal_open,
583                priority_open,
584                deferred_open
585            );
586        });
587
588        // Execute commands on a dedicated worker so ping/cancel traffic is never
589        // blocked by long-running command handlers.
590        let command_started_cleanup = command_started_by_tx.clone();
591        let command_task = tokio::spawn(async move {
592            while let Some(job) = command_rx.recv().await {
593                let command_ctx = command_ctx.clone();
594                let command_priority_tx = command_priority_tx.clone();
595                let command_drop_logger = command_drop_logger.clone();
596                let command_client_id = command_client_id.clone();
597                let tx_id = job.tx_id.clone();
598                let started_map = command_started_cleanup.clone();
599                match tokio::task::spawn_blocking(move || {
600                    Self::execute_command_job(
601                        command_ctx,
602                        &command_priority_tx,
603                        command_drop_logger.as_ref(),
604                        command_client_id,
605                        job,
606                    );
607                })
608                .await
609                {
610                    Ok(()) => {}
611                    Err(e) => {
612                        log::error!("Command worker panicked: {}", e);
613                    }
614                }
615                // NOTE(ts): Clean up timing entry after command completes (success or panic).
616                if let Ok(mut map) = started_map.lock() {
617                    map.remove(&tx_id);
618                }
619            }
620        });
621
622        // Process incoming messages and completed subscription builds concurrently.
623        // NOTE(ts): View/query cell_factory calls are spawned on the blocking thread pool
624        // so they don't block command processing or other messages.
625        let mut outbound_ttl_interval = interval(Duration::from_secs(10));
626        outbound_ttl_interval.tick().await; // NOTE(ts): consume the immediate first tick
627        loop {
628            tokio::select! {
629                // Completed subscription builds — register with session
630                Some(ready) = subscribe_rx.recv() => {
631                    let tx_id = match &ready {
632                        SubscriptionReady::Query { tx_id, .. } => tx_id.clone(),
633                        SubscriptionReady::View { tx_id, .. } => tx_id.clone(),
634                    };
635                    if let Ok(mut map) = subscribe_started_by_tx.lock() {
636                        map.remove(&tx_id);
637                    }
638                    match ready {
639                        SubscriptionReady::Query { tx_id, cellmap, window } => {
640                            session.subscribe_query(tx_id, cellmap, window);
641                        }
642                        SubscriptionReady::View { tx_id, view_id, cellmap, window } => {
643                            session.subscribe_view_with_id(tx_id, view_id, cellmap, window);
644                        }
645                    }
646                }
647                // NOTE(ts): Sweep outbound command entries older than 10s.
648                // Responses normally arrive quickly; stale entries are from
649                // dropped connections or commands that will never get a response.
650                _ = outbound_ttl_interval.tick() => {
651                    if let Ok(mut map) = outbound_commands_by_tx.lock() {
652                        let before = map.len();
653                        map.retain(|_, (_, started)| started.elapsed() < Duration::from_secs(10));
654                        let removed = before - map.len();
655                        if removed > 0 {
656                            log::debug!(
657                                "Outbound command TTL sweep client={}: removed {} stale entries, {} remaining",
658                                session.client_id,
659                                removed,
660                                map.len()
661                            );
662                        }
663                    }
664                }
665                // Incoming WebSocket messages
666                msg = read.next() => {
667                    let Some(msg) = msg else { break };
668                    let ctx = ctx.clone();
669                    let msg = match msg {
670                        Ok(m) => m,
671                        Err(e) => {
672                            log::error!("WebSocket read error from {}: {}", client_id, e);
673                            break;
674                        }
675                    };
676
677                    match msg {
678                        Message::Binary(data) => {
679                            if !use_binary.load(Ordering::SeqCst) {
680                                log::debug!(
681                                    "Client {} switched to binary (msgpack) protocol via auto-detect",
682                                    client_id
683                                );
684                                use_binary.store(true, Ordering::SeqCst);
685                            }
686
687                            match rmp_serde::from_slice::<MykoMessage>(&data) {
688                                Ok(myko_msg) => {
689                                    if let Err(e) = Self::handle_message(
690                                        &mut session,
691                                        ctx,
692                                        &priority_tx,
693                                        &drop_logger,
694                                        &query_ids_by_tx,
695                                        &view_ids_by_tx,
696                                        &subscribe_started_by_tx,
697                                        &command_started_by_tx,
698                                        &outbound_commands_by_tx,
699                                        &command_tx,
700                                        &subscribe_tx,
701                                        myko_msg,
702                                    ) {
703                                        log::error!("Error handling message: {}", e);
704                                    }
705                                    tokio::task::yield_now().await;
706                                }
707                                Err(e) => {
708                                    log::warn!("Failed to parse message from {}: {}", client_id, e);
709                                }
710                            }
711                        }
712                        Message::Text(text) => {
713                            if text == SWITCH_TO_MSGPACK {
714                                log::debug!(
715                                    "Client {} switched to binary (msgpack) protocol via explicit request",
716                                    client_id
717                                );
718                                if let Err(e) = priority_tx.try_send(MykoMessage::ProtocolSwitch {
719                                    protocol: "msgpack".into(),
720                                }) {
721                                    drop_logger.on_drop("ProtocolSwitch", &e);
722                                }
723                                use_binary.store(true, Ordering::SeqCst);
724                                continue;
725                            }
726
727                            match serde_json::from_str::<MykoMessage>(&text) {
728                                Ok(myko_msg) => {
729                                    if let Err(e) = Self::handle_message(
730                                        &mut session,
731                                        ctx,
732                                        &priority_tx,
733                                        &drop_logger,
734                                        &query_ids_by_tx,
735                                        &view_ids_by_tx,
736                                        &subscribe_started_by_tx,
737                                        &command_started_by_tx,
738                                        &outbound_commands_by_tx,
739                                        &command_tx,
740                                        &subscribe_tx,
741                                        myko_msg,
742                                    ) {
743                                        log::error!("Error handling message: {}", e);
744                                    }
745                                    tokio::task::yield_now().await;
746                                }
747                                Err(e) => {
748                                    log::warn!(
749                                        "Failed to parse JSON message from {}: {} | raw: {}",
750                                        client_id,
751                                        e,
752                                        if text.len() > 1000 {
753                                            &text[..1000]
754                                        } else {
755                                            &text
756                                        }
757                                    );
758                                }
759                            }
760                        }
761                        Message::Ping(data) => {
762                            log::trace!("Ping from {}", client_id);
763                            let _ = data;
764                        }
765                        Message::Pong(_) => {
766                            log::trace!("Pong from {}", client_id);
767                        }
768                        Message::Close(frame) => {
769                            log::warn!("Client {} sent close frame: {:?}", client_id, frame);
770                            break;
771                        }
772                        Message::Frame(_) => {}
773                    }
774                }
775            }
776        }
777
778        // Cleanup
779        drop(session); // Drops all subscription guards
780        write_task.abort();
781        command_task.abort();
782
783        // Unregister from client registry
784        if let Some(registry) = try_client_registry() {
785            registry.unregister(&client_id);
786        }
787
788        // Delete Client entity
789        if let Err(e) = ctx.del(&client_entity) {
790            log::error!("Failed to delete client entity: {e}");
791        }
792
793        log::info!("Client disconnected: {} from {}", client_id, addr);
794
795        Ok(())
796    }
797
798    /// Handle a parsed MykoMessage.
799    #[allow(clippy::too_many_arguments)]
800    fn handle_message<W: WsWriter>(
801        session: &mut ClientSession<W>,
802        ctx: Arc<CellServerCtx>,
803        priority_tx: &mpsc::Sender<MykoMessage>,
804        drop_logger: &Arc<DropLogger>,
805        query_ids_by_tx: &Arc<Mutex<HashMap<Arc<str>, Arc<str>>>>,
806        view_ids_by_tx: &Arc<Mutex<HashMap<Arc<str>, Arc<str>>>>,
807        subscribe_started_by_tx: &Arc<Mutex<HashMap<Arc<str>, Instant>>>,
808        command_started_by_tx: &Arc<Mutex<HashMap<Arc<str>, Instant>>>,
809        outbound_commands_by_tx: &Arc<Mutex<HashMap<String, (String, Instant)>>>,
810        command_tx: &mpsc::UnboundedSender<CommandJob>,
811        subscribe_tx: &mpsc::UnboundedSender<SubscriptionReady>,
812        msg: MykoMessage,
813    ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
814        let handler_registry = ctx.handler_registry.clone();
815
816        let registry = ctx.registry.clone();
817
818        let host_id = ctx.host_id;
819
820        match msg {
821            MykoMessage::Query(wrapped) => {
822                // Extract tx from the query JSON
823                let tx_id: Arc<str> = wrapped
824                    .query
825                    .get("tx")
826                    .and_then(|v| v.as_str())
827                    .unwrap_or("unknown")
828                    .into();
829                let query_id = &wrapped.query_id;
830                let entity_type = &wrapped.query_item_type;
831
832                // Defensive dedupe: some clients can replay the same subscribe request.
833                // A duplicate for the same tx would reset server-side sequence/window state.
834                if session.has_subscription(&tx_id) {
835                    log::debug!(
836                        "Ignoring duplicate query subscribe client={} tx={} query_id={} item_type={}",
837                        session.client_id,
838                        tx_id,
839                        query_id,
840                        entity_type
841                    );
842                    return Ok(());
843                }
844                if let Ok(mut map) = query_ids_by_tx.lock() {
845                    map.insert(tx_id.clone(), query_id.clone());
846                }
847                if let Ok(mut map) = subscribe_started_by_tx.lock() {
848                    map.entry(tx_id.clone()).or_insert_with(Instant::now);
849                }
850
851                log::trace!("Query {} for {} (tx: {})", query_id, entity_type, tx_id);
852                log::trace!(
853                    "Query subscribe request client={} tx={} query_id={} item_type={} window={} active_subscriptions_before={}",
854                    session.client_id,
855                    tx_id,
856                    query_id,
857                    entity_type,
858                    wrapped.window.is_some(),
859                    session.subscription_count()
860                );
861
862                let request_context = Arc::new(RequestContext::from_client(
863                    tx_id.clone(),
864                    session.client_id.clone(),
865                    host_id,
866                ));
867
868                if let Some(query_data) = handler_registry.get_query(query_id) {
869                    let parsed = (query_data.parse)(wrapped.query.clone());
870                    match parsed {
871                        Ok(any_query) => {
872                            // NOTE(ts): Spawn cell_factory on blocking pool so it doesn't
873                            // block command processing or other messages.
874                            let cell_factory = query_data.cell_factory;
875                            let registry = registry.clone();
876                            let request_context = request_context.clone();
877                            let ctx = ctx.clone();
878                            let window = wrapped.window.clone();
879                            let query_id = query_id.clone();
880                            let sub_tx = subscribe_tx.clone();
881                            tokio::task::spawn_blocking(move || {
882                                match cell_factory(any_query, registry, request_context, Some(ctx))
883                                {
884                                    Ok(filtered_cellmap) => {
885                                        let _ = sub_tx.send(SubscriptionReady::Query {
886                                            tx_id,
887                                            cellmap: filtered_cellmap,
888                                            window,
889                                        });
890                                    }
891                                    Err(e) => {
892                                        log::error!(
893                                            "Failed to create query cell for {}: {}",
894                                            query_id,
895                                            e
896                                        );
897                                    }
898                                }
899                            });
900                        }
901                        Err(e) => {
902                            log::error!(
903                                "Failed to parse query {}: {} | payload: {}",
904                                query_id,
905                                e,
906                                serde_json::to_string(&wrapped.query).unwrap_or_default()
907                            );
908                        }
909                    }
910                } else {
911                    // Fall back to select all for unknown queries
912                    log::warn!(
913                        "No registered query handler for {}, falling back to select all",
914                        query_id
915                    );
916                    let cellmap = registry.get_or_create(entity_type).select(|_| true);
917                    session.subscribe_query(tx_id, cellmap, wrapped.window.clone());
918                }
919            }
920
921            MykoMessage::View(wrapped) => {
922                let tx_id: Arc<str> = wrapped
923                    .view
924                    .get("tx")
925                    .and_then(|v| v.as_str())
926                    .unwrap_or("unknown")
927                    .into();
928                let view_id = &wrapped.view_id;
929                let item_type = &wrapped.view_item_type;
930
931                // Defensive dedupe: ignore repeated subscribe for an already-active tx.
932                if session.has_subscription(&tx_id) {
933                    log::debug!(
934                        "Ignoring duplicate view subscribe client={} tx={} view_id={} item_type={}",
935                        session.client_id,
936                        tx_id,
937                        view_id,
938                        item_type
939                    );
940                    return Ok(());
941                }
942                if let Ok(mut map) = view_ids_by_tx.lock() {
943                    map.insert(tx_id.clone(), view_id.clone());
944                }
945                if let Ok(mut map) = subscribe_started_by_tx.lock() {
946                    map.entry(tx_id.clone()).or_insert_with(Instant::now);
947                }
948
949                log::trace!("View {} for {} (tx: {})", view_id, item_type, tx_id);
950                log::trace!(
951                    "View subscribe request client={} tx={} view_id={} item_type={} window={:?}",
952                    session.client_id,
953                    tx_id,
954                    view_id,
955                    item_type,
956                    wrapped.window
957                );
958
959                let request_context = Arc::new(RequestContext::from_client(
960                    tx_id.clone(),
961                    session.client_id.clone(),
962                    host_id,
963                ));
964
965                if let Some(view_data) = handler_registry.get_view(view_id) {
966                    let parsed = (view_data.parse)(wrapped.view.clone());
967                    match parsed {
968                        Ok(any_view) => {
969                            log::trace!(
970                                "View parsed successfully client={} tx={} view_id={}",
971                                session.client_id,
972                                tx_id,
973                                view_id
974                            );
975                            // NOTE(ts): Spawn cell_factory on blocking pool so it doesn't
976                            // block command processing or other messages.
977                            let cell_factory = view_data.cell_factory;
978                            let registry = registry.clone();
979                            let ctx = ctx.clone();
980                            let window = wrapped.window.clone();
981                            let view_id_clone = view_id.clone();
982                            let sub_tx = subscribe_tx.clone();
983                            let priority_tx = priority_tx.clone();
984                            let drop_logger = drop_logger.clone();
985                            tokio::task::spawn_blocking(move || {
986                                match cell_factory(any_view, registry, request_context, ctx) {
987                                    Ok(filtered_cellmap) => {
988                                        log::trace!(
989                                            "View cell factory succeeded tx={} view_id={}",
990                                            tx_id,
991                                            view_id_clone
992                                        );
993                                        let _ = sub_tx.send(SubscriptionReady::View {
994                                            tx_id,
995                                            view_id: view_id_clone,
996                                            cellmap: filtered_cellmap,
997                                            window,
998                                        });
999                                    }
1000                                    Err(e) => {
1001                                        log::error!(
1002                                            "Failed to create view cell for {}: {}",
1003                                            view_id_clone,
1004                                            e
1005                                        );
1006                                        if let Err(err) = priority_tx.try_send(
1007                                            MykoMessage::ViewError(ViewError {
1008                                                tx: tx_id.to_string(),
1009                                                view_id: view_id_clone.to_string(),
1010                                                message: e,
1011                                            }),
1012                                        ) {
1013                                            drop_logger.on_drop("ViewError", &err);
1014                                        }
1015                                    }
1016                                }
1017                            });
1018                        }
1019                        Err(e) => {
1020                            let message = format!("Failed to parse view {}: {}", view_id, e);
1021                            log::error!(
1022                                "{} | payload: {}",
1023                                message,
1024                                serde_json::to_string(&wrapped.view).unwrap_or_default()
1025                            );
1026                            if let Err(err) =
1027                                priority_tx.try_send(MykoMessage::ViewError(ViewError {
1028                                    tx: tx_id.to_string(),
1029                                    view_id: view_id.to_string(),
1030                                    message,
1031                                }))
1032                            {
1033                                drop_logger.on_drop("ViewError", &err);
1034                            }
1035                        }
1036                    }
1037                } else {
1038                    let message = format!("No registered handler for view: {}", view_id);
1039                    log::warn!("{}", message);
1040                    if let Err(err) = priority_tx.try_send(MykoMessage::ViewError(ViewError {
1041                        tx: tx_id.to_string(),
1042                        view_id: view_id.to_string(),
1043                        message,
1044                    })) {
1045                        drop_logger.on_drop("ViewError", &err);
1046                    }
1047                }
1048            }
1049
1050            MykoMessage::QueryCancel(CancelSubscription { tx: tx_id }) => {
1051                log::trace!(
1052                    "QueryCancel received: client={} tx={}",
1053                    session.client_id,
1054                    tx_id
1055                );
1056                let tx_id: Arc<str> = tx_id.into();
1057                if let Ok(mut map) = query_ids_by_tx.lock() {
1058                    map.remove(&tx_id);
1059                }
1060                if let Ok(mut map) = subscribe_started_by_tx.lock() {
1061                    map.remove(&tx_id);
1062                }
1063                session.cancel(&tx_id);
1064            }
1065
1066            MykoMessage::QueryWindow(QueryWindowUpdate { tx, window }) => {
1067                let tx_id: Arc<str> = tx.into();
1068                log::trace!(
1069                    "Query window request client={} tx={} has_window={} active_subscriptions={}",
1070                    session.client_id,
1071                    tx_id,
1072                    window.is_some(),
1073                    session.subscription_count()
1074                );
1075                session.update_query_window(&tx_id, window);
1076            }
1077            MykoMessage::ViewCancel(CancelSubscription { tx: tx_id }) => {
1078                log::trace!("View cancel: {}", tx_id);
1079                let tx_id: Arc<str> = tx_id.into();
1080                if let Ok(mut map) = view_ids_by_tx.lock() {
1081                    map.remove(&tx_id);
1082                }
1083                if let Ok(mut map) = subscribe_started_by_tx.lock() {
1084                    map.remove(&tx_id);
1085                }
1086                session.cancel(&tx_id);
1087            }
1088            MykoMessage::ViewWindow(ViewWindowUpdate { tx, window }) => {
1089                let tx_id: Arc<str> = tx.into();
1090                log::trace!("View window update: {}", tx_id);
1091                session.update_view_window(&tx_id, window);
1092            }
1093
1094            MykoMessage::Report(wrapped) => {
1095                // Extract tx from the report JSON
1096                let tx_id: Arc<str> = wrapped
1097                    .report
1098                    .get("tx")
1099                    .and_then(|v| v.as_str())
1100                    .unwrap_or("unknown")
1101                    .into();
1102                let report_id = &wrapped.report_id;
1103
1104                log::trace!(
1105                    "Report subscribe request client={} tx={} report_id={} active_subscriptions_before={}",
1106                    session.client_id,
1107                    tx_id,
1108                    report_id,
1109                    session.subscription_count()
1110                );
1111
1112                // Look up the report registration
1113                if let Some(report_data) = handler_registry.get_report(report_id) {
1114                    // Parse the report JSON to the concrete type
1115                    let parsed = (report_data.parse)(wrapped.report.clone());
1116                    match parsed {
1117                        Ok(any_report) => {
1118                            let request_context = Arc::new(RequestContext::from_client(
1119                                tx_id.clone(),
1120                                session.client_id.clone(),
1121                                host_id,
1122                            ));
1123
1124                            // Create the cell using the factory (with host_id for context)
1125                            match (report_data.cell_factory)(any_report, request_context, ctx) {
1126                                Ok(cell) => {
1127                                    session.subscribe_report(
1128                                        tx_id,
1129                                        report_id.as_str().into(),
1130                                        cell,
1131                                    );
1132                                }
1133                                Err(e) => {
1134                                    log::error!(
1135                                        "Failed to create report cell for {}: {}",
1136                                        report_id,
1137                                        e
1138                                    );
1139                                }
1140                            }
1141                        }
1142                        Err(e) => {
1143                            log::error!(
1144                                "Failed to parse report {}: {} | payload: {}",
1145                                report_id,
1146                                e,
1147                                serde_json::to_string(&wrapped.report).unwrap_or_default()
1148                            );
1149                        }
1150                    }
1151                } else {
1152                    log::warn!("No registered handler for report: {}", report_id);
1153                }
1154            }
1155
1156            MykoMessage::ReportCancel(CancelSubscription { tx: tx_id }) => {
1157                log::trace!(
1158                    "ReportCancel received: client={} tx={} active_subscriptions_before={}",
1159                    session.client_id,
1160                    tx_id,
1161                    session.subscription_count()
1162                );
1163                session.cancel(&tx_id.into());
1164            }
1165
1166            MykoMessage::Event(mut event) => {
1167                record_ws_ingest(std::slice::from_ref(&event));
1168                event.sanitize_null_bytes();
1169                normalize_incoming_event(&mut event, &session.client_id, host_id);
1170                if let Err(e) = ctx.apply_event(event) {
1171                    log::error!(
1172                        "Failed to apply event from client {}: {e}",
1173                        session.client_id
1174                    );
1175                }
1176            }
1177
1178            MykoMessage::EventBatch(mut events) => {
1179                record_ws_ingest(&events);
1180                let incoming = events.len();
1181                if incoming >= 64 {
1182                    log::trace!(
1183                        "Received event batch from client {} size={}",
1184                        session.client_id,
1185                        incoming
1186                    );
1187                }
1188                for event in &mut events {
1189                    event.sanitize_null_bytes();
1190                    normalize_incoming_event(event, &session.client_id, host_id);
1191                }
1192                match ctx.apply_event_batch(events) {
1193                    Ok(applied) => {
1194                        log::trace!(
1195                            "Applied event batch from client {} size={}",
1196                            session.client_id,
1197                            applied
1198                        );
1199                    }
1200                    Err(e) => {
1201                        log::error!(
1202                            "Failed to apply event batch from client {}: {}",
1203                            session.client_id,
1204                            e
1205                        );
1206                    }
1207                }
1208            }
1209
1210            MykoMessage::Command(wrapped) => {
1211                // Extract tx from the command JSON
1212                let tx_id: Arc<str> = wrapped
1213                    .command
1214                    .get("tx")
1215                    .and_then(|v| v.as_str())
1216                    .unwrap_or("unknown")
1217                    .into();
1218
1219                let command_id = &wrapped.command_id;
1220
1221                log::trace!("Command {} (tx: {})", command_id, tx_id,);
1222                let received_at = Instant::now();
1223                if let Ok(mut map) = command_started_by_tx.lock() {
1224                    map.insert(tx_id.clone(), received_at);
1225                }
1226                if let Err(e) = command_tx.send(CommandJob {
1227                    tx_id: tx_id.clone(),
1228                    command_id: wrapped.command_id.clone(),
1229                    command: wrapped.command.clone(),
1230                    received_at,
1231                }) {
1232                    log::error!(
1233                        "Failed to enqueue command {} for client {} tx={}: {}",
1234                        command_id,
1235                        session.client_id,
1236                        tx_id,
1237                        e
1238                    );
1239                    let error = MykoMessage::CommandError(CommandError {
1240                        tx: tx_id.to_string(),
1241                        command_id: command_id.to_string(),
1242                        message: "Command queue unavailable".to_string(),
1243                    });
1244                    if let Err(err) = priority_tx.try_send(error) {
1245                        drop_logger.on_drop("CommandError", &err);
1246                    }
1247                }
1248            }
1249
1250            MykoMessage::Ping(PingData { id, timestamp }) => {
1251                // Echo back the ping data
1252                let pong = MykoMessage::Ping(PingData { id, timestamp });
1253                if let Err(e) = priority_tx.try_send(pong) {
1254                    drop_logger.on_drop("Ping", &e);
1255                }
1256            }
1257
1258            // Response messages - these shouldn't come from clients.
1259            MykoMessage::QueryResponse(resp) => {
1260                log::warn!(
1261                    "Unexpected client message kind=query_response client={} tx={} seq={} upserts={} deletes={} active_subscriptions={}",
1262                    session.client_id,
1263                    resp.tx,
1264                    resp.sequence,
1265                    resp.upserts.len(),
1266                    resp.deletes.len(),
1267                    session.subscription_count()
1268                );
1269            }
1270            MykoMessage::QueryError(err) => {
1271                log::warn!(
1272                    "Unexpected client message kind=query_error client={} tx={} query_id={} message={} active_subscriptions={}",
1273                    session.client_id,
1274                    err.tx,
1275                    err.query_id,
1276                    err.message,
1277                    session.subscription_count()
1278                );
1279            }
1280            MykoMessage::ViewResponse(resp) => {
1281                log::warn!(
1282                    "Unexpected client message kind=view_response client={} tx={} seq={} upserts={} deletes={} active_subscriptions={}",
1283                    session.client_id,
1284                    resp.tx,
1285                    resp.sequence,
1286                    resp.upserts.len(),
1287                    resp.deletes.len(),
1288                    session.subscription_count()
1289                );
1290            }
1291            MykoMessage::ViewError(err) => {
1292                log::warn!(
1293                    "Unexpected client message kind=view_error client={} tx={} view_id={} message={} active_subscriptions={}",
1294                    session.client_id,
1295                    err.tx,
1296                    err.view_id,
1297                    err.message,
1298                    session.subscription_count()
1299                );
1300            }
1301            MykoMessage::ReportResponse(resp) => {
1302                log::warn!(
1303                    "Unexpected client message kind=report_response client={} tx={} active_subscriptions={}",
1304                    session.client_id,
1305                    resp.tx,
1306                    session.subscription_count()
1307                );
1308            }
1309            MykoMessage::ReportError(err) => {
1310                log::warn!(
1311                    "Unexpected client message kind=report_error client={} tx={} report_id={} message={} active_subscriptions={}",
1312                    session.client_id,
1313                    err.tx,
1314                    err.report_id,
1315                    err.message,
1316                    session.subscription_count()
1317                );
1318            }
1319            MykoMessage::CommandResponse(resp) => {
1320                if resp.tx.trim().is_empty() {
1321                    log::warn!(
1322                        "Malformed client message kind=command_response client={} tx=<empty> active_subscriptions={}",
1323                        session.client_id,
1324                        session.subscription_count()
1325                    );
1326                } else {
1327                    let correlated = outbound_commands_by_tx
1328                        .lock()
1329                        .ok()
1330                        .and_then(|mut map| map.remove(&resp.tx));
1331                    if let Some((command_id, started)) = correlated {
1332                        log::debug!(
1333                            "Client command response matched outbound command client={} tx={} command_id={} roundtrip_ms={} active_subscriptions={}",
1334                            session.client_id,
1335                            resp.tx,
1336                            command_id,
1337                            started.elapsed().as_millis(),
1338                            session.subscription_count()
1339                        );
1340                    } else {
1341                        log::warn!(
1342                            "Client command response without outbound match client={} tx={} active_subscriptions={}",
1343                            session.client_id,
1344                            resp.tx,
1345                            session.subscription_count()
1346                        );
1347                    }
1348                }
1349            }
1350            MykoMessage::CommandError(err) => {
1351                if err.tx.trim().is_empty() {
1352                    log::warn!(
1353                        "Malformed client message kind=command_error client={} tx=<empty> command_id={} message={} active_subscriptions={}",
1354                        session.client_id,
1355                        err.command_id,
1356                        err.message,
1357                        session.subscription_count()
1358                    );
1359                } else {
1360                    let correlated = outbound_commands_by_tx
1361                        .lock()
1362                        .ok()
1363                        .and_then(|mut map| map.remove(&err.tx));
1364                    if let Some((command_id, started)) = correlated {
1365                        log::warn!(
1366                            "Client command error matched outbound command client={} tx={} command_id={} transport_command_id={} message={} roundtrip_ms={} active_subscriptions={}",
1367                            session.client_id,
1368                            err.tx,
1369                            err.command_id,
1370                            command_id,
1371                            err.message,
1372                            started.elapsed().as_millis(),
1373                            session.subscription_count()
1374                        );
1375                    } else {
1376                        log::warn!(
1377                            "Client command error without outbound match client={} tx={} command_id={} message={} active_subscriptions={}",
1378                            session.client_id,
1379                            err.tx,
1380                            err.command_id,
1381                            err.message,
1382                            session.subscription_count()
1383                        );
1384                    }
1385                }
1386            }
1387            MykoMessage::Benchmark(payload) => {
1388                let stats = ws_benchmark_stats();
1389                ensure_ws_benchmark_logger();
1390                stats.message_count.fetch_add(1, Ordering::Relaxed);
1391                // Estimate payload size from the JSON value
1392                let size = payload.to_string().len() as u64;
1393                stats.total_bytes.fetch_add(size, Ordering::Relaxed);
1394            }
1395            MykoMessage::ProtocolSwitch { protocol } => {
1396                log::warn!(
1397                    "Unexpected client message kind=protocol_switch_ack client={} protocol={} active_subscriptions={}",
1398                    session.client_id,
1399                    protocol,
1400                    session.subscription_count()
1401                );
1402            }
1403        }
1404
1405        Ok(())
1406    }
1407
1408    fn execute_command_job(
1409        ctx: Arc<CellServerCtx>,
1410        priority_tx: &mpsc::Sender<MykoMessage>,
1411        drop_logger: &DropLogger,
1412        client_id: Arc<str>,
1413        job: CommandJob,
1414    ) {
1415        let host_id = ctx.host_id;
1416        let started = Instant::now();
1417        let queue_wait_ms = started.duration_since(job.received_at).as_millis();
1418        let command_id = job.command_id.clone();
1419
1420        let mut handler_found = false;
1421        for registration in inventory::iter::<CommandHandlerRegistration> {
1422            if registration.command_id == command_id {
1423                handler_found = true;
1424                let executor = (registration.factory)();
1425
1426                let req = Arc::new(RequestContext::from_client(
1427                    job.tx_id.clone(),
1428                    client_id.clone(),
1429                    host_id,
1430                ));
1431                let cmd_id: Arc<str> = Arc::from(command_id.clone());
1432                let cmd_ctx = CommandContext::new(cmd_id, req, ctx.clone());
1433                let execute_started = Instant::now();
1434
1435                match executor.execute_from_value(job.command.clone(), cmd_ctx) {
1436                    Ok(result) => {
1437                        let response = MykoMessage::CommandResponse(CommandResponse {
1438                            response: result,
1439                            tx: job.tx_id.to_string(),
1440                        });
1441                        if let Err(e) = priority_tx.try_send(response) {
1442                            drop_logger.on_drop("CommandResponse", &e);
1443                        }
1444                    }
1445                    Err(e) => {
1446                        let error = MykoMessage::CommandError(CommandError {
1447                            tx: job.tx_id.to_string(),
1448                            command_id: command_id.clone(),
1449                            message: e.message,
1450                        });
1451                        if let Err(err) = priority_tx.try_send(error) {
1452                            drop_logger.on_drop("CommandError", &err);
1453                        }
1454                    }
1455                }
1456                let execute_ms = execute_started.elapsed().as_millis();
1457                let total_ms = job.received_at.elapsed().as_millis();
1458                log::trace!(
1459                    target: "myko_server::ws_perf",
1460                    "command_exec client={} tx={} command_id={} queue_wait_ms={} execute_ms={} total_ms={}",
1461                    client_id,
1462                    job.tx_id,
1463                    command_id,
1464                    queue_wait_ms,
1465                    execute_ms,
1466                    total_ms
1467                );
1468                break;
1469            }
1470        }
1471
1472        if !handler_found {
1473            log::warn!("No registered handler for command: {}", command_id);
1474            let error = MykoMessage::CommandError(CommandError {
1475                tx: job.tx_id.to_string(),
1476                command_id: command_id.clone(),
1477                message: format!("Command handler not found: {}", command_id),
1478            });
1479            if let Err(e) = priority_tx.try_send(error) {
1480                drop_logger.on_drop("CommandError", &e);
1481            }
1482        }
1483
1484        if !handler_found {
1485            log::debug!(
1486                target: "myko_server::ws_perf",
1487                "command_exec client={} tx={} command_id={} queue_wait_ms={} execute_ms=0 total_ms={} handler_found=false",
1488                client_id,
1489                job.tx_id,
1490                command_id,
1491                queue_wait_ms,
1492                job.received_at.elapsed().as_millis()
1493            );
1494        }
1495    }
1496}
1497
1498/// Channel-based WebSocket writer.
1499///
1500/// Sends messages through an mpsc channel which are then
1501/// forwarded to the actual WebSocket.
1502struct ChannelWriter {
1503    tx: mpsc::Sender<OutboundMessage>,
1504    deferred_tx: mpsc::Sender<DeferredOutbound>,
1505    drop_logger: Arc<DropLogger>,
1506    use_binary_writer: Arc<AtomicBool>,
1507}
1508
1509impl WsWriter for ChannelWriter {
1510    fn send(&self, msg: MykoMessage) {
1511        // Use try_send since we're in sync context
1512        if let Err(e) = self.tx.try_send(OutboundMessage::Message(msg)) {
1513            self.drop_logger.on_drop("message", &e);
1514        }
1515    }
1516
1517    fn protocol(&self) -> myko::client::MykoProtocol {
1518        if self.use_binary_writer.load(Ordering::SeqCst) {
1519            myko::client::MykoProtocol::MSGPACK
1520        } else {
1521            myko::client::MykoProtocol::JSON
1522        }
1523    }
1524
1525    fn send_serialized_command(
1526        &self,
1527        tx: Arc<str>,
1528        command_id: String,
1529        payload: EncodedCommandMessage,
1530    ) {
1531        if let Err(e) = self.tx.try_send(OutboundMessage::SerializedCommand {
1532            tx,
1533            command_id,
1534            payload,
1535        }) {
1536            self.drop_logger.on_drop("serialized_command", &e);
1537        }
1538    }
1539
1540    fn send_report_response(&self, tx: Arc<str>, output: Arc<dyn AnyOutput>) {
1541        if let Err(e) = self
1542            .deferred_tx
1543            .try_send(DeferredOutbound::Report(tx, output))
1544        {
1545            self.drop_logger.on_drop("ReportResponseDeferred", &e);
1546        }
1547    }
1548
1549    fn send_query_response(&self, response: PendingQueryResponse, is_view: bool) {
1550        if let Err(e) = self
1551            .deferred_tx
1552            .try_send(DeferredOutbound::Query { response, is_view })
1553        {
1554            self.drop_logger.on_drop("QueryResponseDeferred", &e);
1555        }
1556    }
1557}
1558
1559#[cfg(test)]
1560mod tests {
1561    use super::*;
1562
1563    #[test]
1564    fn test_channel_writer() {
1565        let (tx, mut rx) = mpsc::channel(10);
1566        let (deferred_tx, _deferred_rx) = mpsc::channel(10);
1567        let drop_logger = Arc::new(DropLogger::new("test-client".into()));
1568        let writer = ChannelWriter {
1569            tx,
1570            deferred_tx,
1571            drop_logger,
1572            use_binary_writer: Arc::new(AtomicBool::new(false)),
1573        };
1574
1575        let msg = MykoMessage::Ping(PingData {
1576            id: "test".to_string(),
1577            timestamp: 0,
1578        });
1579        writer.send(msg);
1580
1581        let received = rx.try_recv().unwrap();
1582        assert!(matches!(
1583            received,
1584            OutboundMessage::Message(MykoMessage::Ping(_))
1585        ));
1586    }
1587}