Skip to main content

myko_server/
ws_handler.rs

1//! WebSocket handler for the cell-based server.
2//!
3//! Handles WebSocket connections using ClientSession for subscription management.
4
5use std::{
6    collections::HashMap,
7    net::SocketAddr,
8    sync::{
9        Arc, Mutex, OnceLock,
10        atomic::{AtomicBool, AtomicU64, Ordering},
11    },
12    thread,
13    time::{Duration, Instant},
14};
15
16use dashmap::DashMap;
17use futures_util::{SinkExt, StreamExt};
18use hyphae::SelectExt;
19use myko::{
20    WS_MAX_FRAME_SIZE_BYTES, WS_MAX_MESSAGE_SIZE_BYTES,
21    command::{CommandContext, CommandHandlerRegistration},
22    entities::client::{Client, ClientId},
23    relationship::{iter_client_id_registrations, iter_fallback_to_id_registrations},
24    report::AnyOutput,
25    request::RequestContext,
26    server::{
27        CellServerCtx, ClientSession, PendingQueryResponse, WsWriter,
28        client_registry::try_client_registry,
29    },
30    wire::{
31        CancelSubscription, CommandError, CommandResponse, EncodedCommandMessage, MEvent,
32        MEventType, MykoMessage, PingData, QueryWindowUpdate, ViewError, ViewWindowUpdate,
33    },
34};
35use tokio::{net::TcpStream, sync::mpsc, time::interval};
36use tokio_tungstenite::{
37    accept_async_with_config,
38    tungstenite::{Message, protocol::WebSocketConfig},
39};
40use uuid::Uuid;
41
42/// Protocol switch message sent by client to enable binary (msgpack) encoding.
43/// Must match ProtocolMessages.SwitchToMSGPACK in TypeScript client.
44const SWITCH_TO_MSGPACK: &str = "myko:switch-to-msgpack";
45
46struct WsIngestStats {
47    counts_by_type: DashMap<Arc<str>, Arc<AtomicU64>>,
48}
49
50struct WsBenchmarkStats {
51    message_count: AtomicU64,
52    total_bytes: AtomicU64,
53}
54
55static WS_INGEST_STATS: OnceLock<Arc<WsIngestStats>> = OnceLock::new();
56static WS_INGEST_LOGGER_STARTED: AtomicBool = AtomicBool::new(false);
57static WS_BENCHMARK_STATS: OnceLock<Arc<WsBenchmarkStats>> = OnceLock::new();
58static WS_BENCHMARK_LOGGER_STARTED: AtomicBool = AtomicBool::new(false);
59
60fn ws_benchmark_stats() -> Arc<WsBenchmarkStats> {
61    WS_BENCHMARK_STATS
62        .get_or_init(|| {
63            Arc::new(WsBenchmarkStats {
64                message_count: AtomicU64::new(0),
65                total_bytes: AtomicU64::new(0),
66            })
67        })
68        .clone()
69}
70
71fn ensure_ws_benchmark_logger() {
72    if WS_BENCHMARK_LOGGER_STARTED
73        .compare_exchange(false, true, Ordering::Relaxed, Ordering::Relaxed)
74        .is_err()
75    {
76        return;
77    }
78
79    let stats = ws_benchmark_stats();
80    thread::Builder::new()
81        .name("ws-benchmark-logger".into())
82        .spawn(move || {
83            loop {
84                thread::sleep(Duration::from_secs(1));
85
86                let count = stats.message_count.swap(0, Ordering::Relaxed);
87                let bytes = stats.total_bytes.swap(0, Ordering::Relaxed);
88
89                if count == 0 {
90                    continue;
91                }
92
93                log::info!(
94                    "WebSocket benchmark last_1s messages={} bytes={} avg_bytes={}",
95                    count,
96                    bytes,
97                    if count > 0 { bytes / count } else { 0 },
98                );
99            }
100        })
101        .expect("failed to spawn websocket benchmark logger thread");
102}
103
104fn ws_ingest_stats() -> Arc<WsIngestStats> {
105    WS_INGEST_STATS
106        .get_or_init(|| {
107            Arc::new(WsIngestStats {
108                counts_by_type: DashMap::new(),
109            })
110        })
111        .clone()
112}
113
114fn ensure_ws_ingest_logger() {
115    if WS_INGEST_LOGGER_STARTED
116        .compare_exchange(false, true, Ordering::Relaxed, Ordering::Relaxed)
117        .is_err()
118    {
119        return;
120    }
121
122    let stats = ws_ingest_stats();
123    thread::Builder::new()
124        .name("ws-ingest-logger".into())
125        .spawn(move || {
126            loop {
127                thread::sleep(Duration::from_secs(1));
128
129                let mut per_type = Vec::new();
130                let mut total = 0u64;
131                for entry in &stats.counts_by_type {
132                    let count = entry.value().swap(0, Ordering::Relaxed);
133                    if count == 0 {
134                        continue;
135                    }
136                    total += count;
137                    per_type.push((entry.key().clone(), count));
138                }
139
140                if total == 0 {
141                    continue;
142                }
143
144                per_type.sort_by(|a, b| b.1.cmp(&a.1).then_with(|| a.0.cmp(&b.0)));
145                let details = per_type
146                    .into_iter()
147                    .map(|(entity_type, count)| format!("{entity_type}={count}"))
148                    .collect::<Vec<_>>()
149                    .join(" ");
150
151                log::info!("WebSocket ingest last_1s total={} {}", total, details);
152            }
153        })
154        .expect("failed to spawn websocket ingest logger thread");
155}
156
157fn record_ws_ingest(events: &[MEvent]) {
158    if events.is_empty() {
159        return;
160    }
161
162    let stats = ws_ingest_stats();
163    for event in events {
164        let counter = stats
165            .counts_by_type
166            .entry(Arc::<str>::from(event.item_type.as_str()))
167            .or_insert_with(|| Arc::new(AtomicU64::new(0)))
168            .clone();
169        counter.fetch_add(1, Ordering::Relaxed);
170    }
171}
172
173fn normalize_incoming_event(event: &mut MEvent, client_id: &str) {
174    if event.change_type != MEventType::SET {
175        return;
176    }
177
178    // Auto-populate #[myko_client_id] fields with the connection's client_id
179    for reg in iter_client_id_registrations() {
180        if reg.entity_type == event.item_type {
181            if let Some(obj) = event.item.as_object_mut() {
182                obj.insert(
183                    reg.field_name_json.to_string(),
184                    serde_json::Value::String(client_id.to_string()),
185                );
186            }
187            break;
188        }
189    }
190
191    // Auto-populate #[fallback_to_id] fields with the entity's own id
192    // if the field is null or missing.
193    if let Some(obj) = event.item.as_object_mut()
194        && let Some(id) = obj.get("id").and_then(|v| v.as_str()).map(String::from)
195    {
196        for reg in iter_fallback_to_id_registrations() {
197            if reg.entity_type == event.item_type {
198                let field = reg.field_name_json;
199                if matches!(obj.get(field), None | Some(serde_json::Value::Null)) {
200                    obj.insert(field.to_string(), serde_json::Value::String(id.clone()));
201                }
202            }
203        }
204    }
205}
206
207/// Per-connection drop tracking to avoid log storms when clients fall behind.
208///
209/// When the outbound channel is full, we will drop messages (same as today),
210/// but we must not `warn!` for every drop or we can effectively DoS ourselves.
211struct DropLogger {
212    client_id: Arc<str>,
213    dropped: std::sync::atomic::AtomicU64,
214    last_log_ms: std::sync::atomic::AtomicU64,
215}
216
217impl DropLogger {
218    fn new(client_id: Arc<str>) -> Self {
219        Self {
220            client_id,
221            dropped: std::sync::atomic::AtomicU64::new(0),
222            last_log_ms: std::sync::atomic::AtomicU64::new(0),
223        }
224    }
225
226    fn on_drop(&self, kind: &'static str, err: &dyn std::fmt::Display) {
227        use std::sync::atomic::Ordering;
228
229        self.dropped.fetch_add(1, Ordering::Relaxed);
230
231        // Log at most once per second per connection.
232        let now_ms = std::time::SystemTime::now()
233            .duration_since(std::time::UNIX_EPOCH)
234            .unwrap_or_default()
235            .as_millis() as u64;
236        let last_ms = self.last_log_ms.load(Ordering::Relaxed);
237        if now_ms.saturating_sub(last_ms) < 1000 {
238            return;
239        }
240
241        if self
242            .last_log_ms
243            .compare_exchange(last_ms, now_ms, Ordering::Relaxed, Ordering::Relaxed)
244            .is_err()
245        {
246            return;
247        }
248
249        let n = self.dropped.swap(0, Ordering::Relaxed);
250        log::warn!(
251            "WebSocket send buffer full; dropped {} message(s) for client {} (latest: {}): {}",
252            n,
253            self.client_id,
254            kind,
255            err
256        );
257    }
258}
259
260struct CommandJob {
261    tx_id: Arc<str>,
262    command_id: String,
263    command: serde_json::Value,
264    received_at: Instant,
265}
266
267/// Result of an async subscription build (query or view cell_factory).
268enum SubscriptionReady {
269    Query {
270        tx_id: Arc<str>,
271        cellmap: hyphae::CellMap<Arc<str>, Arc<dyn myko::item::AnyItem>, hyphae::CellImmutable>,
272        window: Option<myko::wire::QueryWindow>,
273    },
274    View {
275        tx_id: Arc<str>,
276        view_id: Arc<str>,
277        cellmap: hyphae::CellMap<Arc<str>, Arc<dyn myko::item::AnyItem>, hyphae::CellImmutable>,
278        window: Option<myko::wire::QueryWindow>,
279    },
280}
281
282enum OutboundMessage {
283    Message(MykoMessage),
284    SerializedCommand {
285        tx: Arc<str>,
286        command_id: String,
287        payload: EncodedCommandMessage,
288    },
289}
290
291enum DeferredOutbound {
292    Report(Arc<str>, Arc<dyn AnyOutput>),
293    Query {
294        response: PendingQueryResponse,
295        is_view: bool,
296    },
297}
298
299/// WebSocket handler for a single client connection.
300pub struct WsHandler;
301
302impl WsHandler {
303    /// Handle a new WebSocket connection.
304    #[allow(clippy::too_many_arguments)]
305    pub async fn handle_connection(
306        stream: TcpStream,
307        addr: SocketAddr,
308        ctx: Arc<CellServerCtx>,
309    ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
310        ensure_ws_ingest_logger();
311
312        let host_id = ctx.host_id;
313
314        let mut ws_config = WebSocketConfig::default();
315        ws_config.max_message_size = Some(WS_MAX_MESSAGE_SIZE_BYTES);
316        ws_config.max_frame_size = Some(WS_MAX_FRAME_SIZE_BYTES);
317        let ws_stream = accept_async_with_config(stream, Some(ws_config)).await?;
318        let (mut write, mut read) = ws_stream.split();
319
320        // Create a bounded channel for sending messages to the client
321        // High limit (10k) since we have good memory availability
322        let (tx, mut rx) = mpsc::channel::<OutboundMessage>(10_000);
323        let (deferred_tx, mut deferred_rx) = mpsc::channel::<DeferredOutbound>(10_000);
324        let (priority_tx, mut priority_rx) = mpsc::channel::<MykoMessage>(1_000);
325        let (command_tx, mut command_rx) = mpsc::unbounded_channel::<CommandJob>();
326        let (subscribe_tx, mut subscribe_rx) = mpsc::unbounded_channel::<SubscriptionReady>();
327
328        // Protocol: default to JSON, switch to binary only if client opts in
329        let use_binary = Arc::new(AtomicBool::new(false));
330
331        // Create client session with channel-based writer
332        let client_id: Arc<str> = Uuid::new_v4().to_string().into();
333        let drop_logger = Arc::new(DropLogger::new(client_id.clone()));
334        let writer = ChannelWriter {
335            tx: tx.clone(),
336            deferred_tx: deferred_tx.clone(),
337            drop_logger: drop_logger.clone(),
338            use_binary_writer: use_binary.clone(),
339        };
340
341        // Register writer in the global client registry (if initialized)
342        let writer_arc: Arc<dyn WsWriter> = Arc::new(ChannelWriter {
343            tx: tx.clone(),
344            deferred_tx: deferred_tx.clone(),
345            drop_logger: drop_logger.clone(),
346            use_binary_writer: use_binary.clone(),
347        });
348        if let Some(registry) = try_client_registry() {
349            registry.register(client_id.clone(), writer_arc);
350        }
351
352        let mut session = ClientSession::new(client_id.clone(), writer);
353
354        let use_binary_writer = use_binary.clone();
355        let query_ids_by_tx: Arc<Mutex<HashMap<Arc<str>, Arc<str>>>> =
356            Arc::new(Mutex::new(HashMap::new()));
357        let view_ids_by_tx: Arc<Mutex<HashMap<Arc<str>, Arc<str>>>> =
358            Arc::new(Mutex::new(HashMap::new()));
359        let subscribe_started_by_tx: Arc<Mutex<HashMap<Arc<str>, Instant>>> =
360            Arc::new(Mutex::new(HashMap::new()));
361        let command_started_by_tx: Arc<Mutex<HashMap<Arc<str>, Instant>>> =
362            Arc::new(Mutex::new(HashMap::new()));
363        let outbound_commands_by_tx: Arc<Mutex<HashMap<String, (String, Instant)>>> =
364            Arc::new(Mutex::new(HashMap::new()));
365
366        let outbound_commands_by_tx_writer = outbound_commands_by_tx.clone();
367
368        // Publish Client entity
369        let client_entity = Client {
370            id: ClientId(client_id.clone()),
371            server_id: host_id.to_string().into(),
372            address: Some(Arc::from(addr.to_string())),
373            windback: None,
374        };
375        if let Err(e) = ctx.set(&client_entity) {
376            log::error!("Failed to persist client entity: {e}");
377        }
378
379        log::info!("Client connected: {} from {}", client_id, addr);
380
381        let write_ctx = ctx.clone();
382        let write_client_id = client_id.clone();
383        let write_addr = addr;
384        let command_ctx = ctx.clone();
385        let command_priority_tx = priority_tx.clone();
386        let command_drop_logger = drop_logger.clone();
387        let command_client_id = client_id.clone();
388
389        // Spawn task to forward messages from channel to WebSocket
390        let write_task = tokio::spawn(async move {
391            let _ctx = write_ctx;
392            let mut normal_open = true;
393            let mut priority_open = true;
394            let mut deferred_open = true;
395            while normal_open || priority_open || deferred_open {
396                let msg = tokio::select! {
397                    biased;
398                    maybe = priority_rx.recv(), if priority_open => {
399                        match maybe {
400                            Some(msg) => OutboundMessage::Message(msg),
401                            None => {
402                                priority_open = false;
403                                continue;
404                            }
405                        }
406                    }
407                    maybe = deferred_rx.recv(), if deferred_open => {
408                        match maybe {
409                            Some(DeferredOutbound::Report(tx, output)) => {
410                                OutboundMessage::Message(MykoMessage::ReportResponse(myko::wire::ReportResponse {
411                                    response: output.to_value(),
412                                    tx: tx.to_string(),
413                                }))
414                            }
415                            Some(DeferredOutbound::Query { response, is_view }) => {
416                                if is_view {
417                                    OutboundMessage::Message(MykoMessage::ViewResponse(response.into_wire()))
418                                } else {
419                                    OutboundMessage::Message(MykoMessage::QueryResponse(response.into_wire()))
420                                }
421                            }
422                            None => {
423                                deferred_open = false;
424                                continue;
425                            }
426                        }
427                    }
428                    maybe = rx.recv(), if normal_open => {
429                        match maybe {
430                            Some(msg) => msg,
431                            None => {
432                                normal_open = false;
433                                continue;
434                            }
435                        }
436                    }
437                };
438                let (kind, tx_id, seq, _upserts, _deletes, _total_count) = match &msg {
439                    OutboundMessage::SerializedCommand { tx, .. } => {
440                        ("command", Some(tx.clone()), None, None, None, None)
441                    }
442                    OutboundMessage::Message(msg) => match msg {
443                        MykoMessage::ViewResponse(r) => (
444                            "view_response",
445                            Some(r.tx.clone()),
446                            Some(r.sequence),
447                            Some(r.upserts.len()),
448                            Some(r.deletes.len()),
449                            r.total_count,
450                        ),
451                        MykoMessage::QueryResponse(r) => (
452                            "query_response",
453                            Some(r.tx.clone()),
454                            Some(r.sequence),
455                            Some(r.upserts.len()),
456                            Some(r.deletes.len()),
457                            r.total_count,
458                        ),
459                        MykoMessage::CommandResponse(r) => (
460                            "command_response",
461                            Some(Arc::<str>::from(r.tx.clone())),
462                            None,
463                            None,
464                            None,
465                            None,
466                        ),
467                        MykoMessage::CommandError(r) => (
468                            "command_error",
469                            Some(Arc::<str>::from(r.tx.clone())),
470                            None,
471                            None,
472                            None,
473                            None,
474                        ),
475                        _ => ("other", None, None, None, None, None),
476                    },
477                };
478
479                match &msg {
480                    OutboundMessage::SerializedCommand { tx, command_id, .. } => {
481                        if !tx.trim().is_empty()
482                            && let Ok(mut map) = outbound_commands_by_tx_writer.lock()
483                        {
484                            map.insert(tx.to_string(), (command_id.clone(), Instant::now()));
485                        }
486                    }
487                    OutboundMessage::Message(MykoMessage::Command(wrapped)) => {
488                        if let Some(tx_id) = wrapped.command.get("tx").and_then(|v| v.as_str())
489                            && !tx_id.trim().is_empty()
490                            && let Ok(mut map) = outbound_commands_by_tx_writer.lock()
491                        {
492                            map.insert(
493                                tx_id.to_string(),
494                                (wrapped.command_id.clone(), Instant::now()),
495                            );
496                        }
497                    }
498                    _ => {}
499                }
500
501                let ws_msg = match &msg {
502                    OutboundMessage::SerializedCommand {
503                        payload: EncodedCommandMessage::Json(json),
504                        ..
505                    } => Message::Text(json.clone().into()),
506                    OutboundMessage::SerializedCommand {
507                        payload: EncodedCommandMessage::Msgpack(bytes),
508                        ..
509                    } => Message::Binary(bytes.clone().into()),
510                    OutboundMessage::Message(msg) if use_binary_writer.load(Ordering::SeqCst) => {
511                        match rmp_serde::to_vec(msg) {
512                            Ok(bytes) => Message::Binary(bytes.into()),
513                            Err(e) => {
514                                log::error!("Failed to serialize message to msgpack: {}", e);
515                                continue;
516                            }
517                        }
518                    }
519                    OutboundMessage::Message(msg) => match serde_json::to_string(msg) {
520                        Ok(json) => Message::Text(json.into()),
521                        Err(e) => {
522                            log::error!("Failed to serialize message to JSON: {}", e);
523                            continue;
524                        }
525                    },
526                };
527                let payload_bytes = match &ws_msg {
528                    Message::Binary(b) => b.len(),
529                    Message::Text(t) => t.len(),
530                    _ => 0,
531                };
532
533                if let Err(err) = write.send(ws_msg).await {
534                    log::error!(
535                        "WebSocket write failed for client {} from {} kind={} tx={:?} seq={:?} payload_bytes={} binary={}: {}",
536                        write_client_id,
537                        write_addr,
538                        kind,
539                        tx_id,
540                        seq,
541                        payload_bytes,
542                        use_binary_writer.load(Ordering::SeqCst),
543                        err
544                    );
545                    break;
546                }
547            }
548            // NOTE(ts): Unregister from client registry immediately so the node
549            // executor stops serializing commands into a dead channel.
550            if let Some(registry) = try_client_registry() {
551                registry.unregister(&write_client_id);
552                log::info!(
553                    "WebSocket writer unregistered client {} from {} (write task exiting)",
554                    write_client_id,
555                    write_addr,
556                );
557            }
558            log::warn!(
559                "WebSocket writer task exiting for client {} from {} normal_open={} priority_open={} deferred_open={}",
560                write_client_id,
561                write_addr,
562                normal_open,
563                priority_open,
564                deferred_open
565            );
566        });
567
568        // Execute commands on a dedicated worker so ping/cancel traffic is never
569        // blocked by long-running command handlers.
570        let command_started_cleanup = command_started_by_tx.clone();
571        let command_task = tokio::spawn(async move {
572            while let Some(job) = command_rx.recv().await {
573                let command_ctx = command_ctx.clone();
574                let command_priority_tx = command_priority_tx.clone();
575                let command_drop_logger = command_drop_logger.clone();
576                let command_client_id = command_client_id.clone();
577                let tx_id = job.tx_id.clone();
578                let started_map = command_started_cleanup.clone();
579                match tokio::task::spawn_blocking(move || {
580                    Self::execute_command_job(
581                        command_ctx,
582                        &command_priority_tx,
583                        command_drop_logger.as_ref(),
584                        command_client_id,
585                        job,
586                    );
587                })
588                .await
589                {
590                    Ok(()) => {}
591                    Err(e) => {
592                        log::error!("Command worker panicked: {}", e);
593                    }
594                }
595                // NOTE(ts): Clean up timing entry after command completes (success or panic).
596                if let Ok(mut map) = started_map.lock() {
597                    map.remove(&tx_id);
598                }
599            }
600        });
601
602        // Process incoming messages and completed subscription builds concurrently.
603        // NOTE(ts): View/query cell_factory calls are spawned on the blocking thread pool
604        // so they don't block command processing or other messages.
605        let mut outbound_ttl_interval = interval(Duration::from_secs(10));
606        outbound_ttl_interval.tick().await; // NOTE(ts): consume the immediate first tick
607        loop {
608            tokio::select! {
609                // Completed subscription builds — register with session
610                Some(ready) = subscribe_rx.recv() => {
611                    let tx_id = match &ready {
612                        SubscriptionReady::Query { tx_id, .. } => tx_id.clone(),
613                        SubscriptionReady::View { tx_id, .. } => tx_id.clone(),
614                    };
615                    if let Ok(mut map) = subscribe_started_by_tx.lock() {
616                        map.remove(&tx_id);
617                    }
618                    match ready {
619                        SubscriptionReady::Query { tx_id, cellmap, window } => {
620                            session.subscribe_query(tx_id, cellmap, window);
621                        }
622                        SubscriptionReady::View { tx_id, view_id, cellmap, window } => {
623                            session.subscribe_view_with_id(tx_id, view_id, cellmap, window);
624                        }
625                    }
626                }
627                // NOTE(ts): Sweep outbound command entries older than 10s.
628                // Responses normally arrive quickly; stale entries are from
629                // dropped connections or commands that will never get a response.
630                _ = outbound_ttl_interval.tick() => {
631                    if let Ok(mut map) = outbound_commands_by_tx.lock() {
632                        let before = map.len();
633                        map.retain(|_, (_, started)| started.elapsed() < Duration::from_secs(10));
634                        let removed = before - map.len();
635                        if removed > 0 {
636                            log::debug!(
637                                "Outbound command TTL sweep client={}: removed {} stale entries, {} remaining",
638                                session.client_id,
639                                removed,
640                                map.len()
641                            );
642                        }
643                    }
644                }
645                // Incoming WebSocket messages
646                msg = read.next() => {
647                    let Some(msg) = msg else { break };
648                    let ctx = ctx.clone();
649                    let msg = match msg {
650                        Ok(m) => m,
651                        Err(e) => {
652                            log::error!("WebSocket read error from {}: {}", client_id, e);
653                            break;
654                        }
655                    };
656
657                    match msg {
658                        Message::Binary(data) => {
659                            if !use_binary.load(Ordering::SeqCst) {
660                                log::debug!(
661                                    "Client {} switched to binary (msgpack) protocol via auto-detect",
662                                    client_id
663                                );
664                                use_binary.store(true, Ordering::SeqCst);
665                            }
666
667                            match rmp_serde::from_slice::<MykoMessage>(&data) {
668                                Ok(myko_msg) => {
669                                    if let Err(e) = Self::handle_message(
670                                        &mut session,
671                                        ctx,
672                                        &priority_tx,
673                                        &drop_logger,
674                                        &query_ids_by_tx,
675                                        &view_ids_by_tx,
676                                        &subscribe_started_by_tx,
677                                        &command_started_by_tx,
678                                        &outbound_commands_by_tx,
679                                        &command_tx,
680                                        &subscribe_tx,
681                                        myko_msg,
682                                    ) {
683                                        log::error!("Error handling message: {}", e);
684                                    }
685                                    tokio::task::yield_now().await;
686                                }
687                                Err(e) => {
688                                    log::warn!("Failed to parse message from {}: {}", client_id, e);
689                                }
690                            }
691                        }
692                        Message::Text(text) => {
693                            if text == SWITCH_TO_MSGPACK {
694                                log::debug!(
695                                    "Client {} switched to binary (msgpack) protocol via explicit request",
696                                    client_id
697                                );
698                                if let Err(e) = priority_tx.try_send(MykoMessage::ProtocolSwitch {
699                                    protocol: "msgpack".into(),
700                                }) {
701                                    drop_logger.on_drop("ProtocolSwitch", &e);
702                                }
703                                use_binary.store(true, Ordering::SeqCst);
704                                continue;
705                            }
706
707                            match serde_json::from_str::<MykoMessage>(&text) {
708                                Ok(myko_msg) => {
709                                    if let Err(e) = Self::handle_message(
710                                        &mut session,
711                                        ctx,
712                                        &priority_tx,
713                                        &drop_logger,
714                                        &query_ids_by_tx,
715                                        &view_ids_by_tx,
716                                        &subscribe_started_by_tx,
717                                        &command_started_by_tx,
718                                        &outbound_commands_by_tx,
719                                        &command_tx,
720                                        &subscribe_tx,
721                                        myko_msg,
722                                    ) {
723                                        log::error!("Error handling message: {}", e);
724                                    }
725                                    tokio::task::yield_now().await;
726                                }
727                                Err(e) => {
728                                    log::warn!(
729                                        "Failed to parse JSON message from {}: {} | raw: {}",
730                                        client_id,
731                                        e,
732                                        if text.len() > 1000 {
733                                            &text[..1000]
734                                        } else {
735                                            &text
736                                        }
737                                    );
738                                }
739                            }
740                        }
741                        Message::Ping(data) => {
742                            log::trace!("Ping from {}", client_id);
743                            let _ = data;
744                        }
745                        Message::Pong(_) => {
746                            log::trace!("Pong from {}", client_id);
747                        }
748                        Message::Close(frame) => {
749                            log::warn!("Client {} sent close frame: {:?}", client_id, frame);
750                            break;
751                        }
752                        Message::Frame(_) => {}
753                    }
754                }
755            }
756        }
757
758        // Cleanup
759        drop(session); // Drops all subscription guards
760        write_task.abort();
761        command_task.abort();
762
763        // Unregister from client registry
764        if let Some(registry) = try_client_registry() {
765            registry.unregister(&client_id);
766        }
767
768        // Delete Client entity
769        if let Err(e) = ctx.del(&client_entity) {
770            log::error!("Failed to delete client entity: {e}");
771        }
772
773        log::info!("Client disconnected: {} from {}", client_id, addr);
774
775        Ok(())
776    }
777
778    /// Handle a parsed MykoMessage.
779    #[allow(clippy::too_many_arguments)]
780    fn handle_message<W: WsWriter>(
781        session: &mut ClientSession<W>,
782        ctx: Arc<CellServerCtx>,
783        priority_tx: &mpsc::Sender<MykoMessage>,
784        drop_logger: &Arc<DropLogger>,
785        query_ids_by_tx: &Arc<Mutex<HashMap<Arc<str>, Arc<str>>>>,
786        view_ids_by_tx: &Arc<Mutex<HashMap<Arc<str>, Arc<str>>>>,
787        subscribe_started_by_tx: &Arc<Mutex<HashMap<Arc<str>, Instant>>>,
788        command_started_by_tx: &Arc<Mutex<HashMap<Arc<str>, Instant>>>,
789        outbound_commands_by_tx: &Arc<Mutex<HashMap<String, (String, Instant)>>>,
790        command_tx: &mpsc::UnboundedSender<CommandJob>,
791        subscribe_tx: &mpsc::UnboundedSender<SubscriptionReady>,
792        msg: MykoMessage,
793    ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
794        let handler_registry = ctx.handler_registry.clone();
795
796        let registry = ctx.registry.clone();
797
798        let host_id = ctx.host_id;
799
800        match msg {
801            MykoMessage::Query(wrapped) => {
802                // Extract tx from the query JSON
803                let tx_id: Arc<str> = wrapped
804                    .query
805                    .get("tx")
806                    .and_then(|v| v.as_str())
807                    .unwrap_or("unknown")
808                    .into();
809                let query_id = &wrapped.query_id;
810                let entity_type = &wrapped.query_item_type;
811
812                // Defensive dedupe: some clients can replay the same subscribe request.
813                // A duplicate for the same tx would reset server-side sequence/window state.
814                if session.has_subscription(&tx_id) {
815                    log::debug!(
816                        "Ignoring duplicate query subscribe client={} tx={} query_id={} item_type={}",
817                        session.client_id,
818                        tx_id,
819                        query_id,
820                        entity_type
821                    );
822                    return Ok(());
823                }
824                if let Ok(mut map) = query_ids_by_tx.lock() {
825                    map.insert(tx_id.clone(), query_id.clone());
826                }
827                if let Ok(mut map) = subscribe_started_by_tx.lock() {
828                    map.entry(tx_id.clone()).or_insert_with(Instant::now);
829                }
830
831                log::trace!("Query {} for {} (tx: {})", query_id, entity_type, tx_id);
832                log::trace!(
833                    "Query subscribe request client={} tx={} query_id={} item_type={} window={} active_subscriptions_before={}",
834                    session.client_id,
835                    tx_id,
836                    query_id,
837                    entity_type,
838                    wrapped.window.is_some(),
839                    session.subscription_count()
840                );
841
842                let request_context = Arc::new(RequestContext::from_client(
843                    tx_id.clone(),
844                    session.client_id.clone(),
845                    host_id,
846                ));
847
848                if let Some(query_data) = handler_registry.get_query(query_id) {
849                    let parsed = (query_data.parse)(wrapped.query.clone());
850                    match parsed {
851                        Ok(any_query) => {
852                            // NOTE(ts): Spawn cell_factory on blocking pool so it doesn't
853                            // block command processing or other messages.
854                            let cell_factory = query_data.cell_factory;
855                            let registry = registry.clone();
856                            let request_context = request_context.clone();
857                            let ctx = ctx.clone();
858                            let window = wrapped.window.clone();
859                            let query_id = query_id.clone();
860                            let sub_tx = subscribe_tx.clone();
861                            tokio::task::spawn_blocking(move || {
862                                match cell_factory(any_query, registry, request_context, Some(ctx))
863                                {
864                                    Ok(filtered_cellmap) => {
865                                        let _ = sub_tx.send(SubscriptionReady::Query {
866                                            tx_id,
867                                            cellmap: filtered_cellmap,
868                                            window,
869                                        });
870                                    }
871                                    Err(e) => {
872                                        log::error!(
873                                            "Failed to create query cell for {}: {}",
874                                            query_id,
875                                            e
876                                        );
877                                    }
878                                }
879                            });
880                        }
881                        Err(e) => {
882                            log::error!(
883                                "Failed to parse query {}: {} | payload: {}",
884                                query_id,
885                                e,
886                                serde_json::to_string(&wrapped.query).unwrap_or_default()
887                            );
888                        }
889                    }
890                } else {
891                    // Fall back to select all for unknown queries
892                    log::warn!(
893                        "No registered query handler for {}, falling back to select all",
894                        query_id
895                    );
896                    let cellmap = registry.get_or_create(entity_type).select(|_| true);
897                    session.subscribe_query(tx_id, cellmap, wrapped.window.clone());
898                }
899            }
900
901            MykoMessage::View(wrapped) => {
902                let tx_id: Arc<str> = wrapped
903                    .view
904                    .get("tx")
905                    .and_then(|v| v.as_str())
906                    .unwrap_or("unknown")
907                    .into();
908                let view_id = &wrapped.view_id;
909                let item_type = &wrapped.view_item_type;
910
911                // Defensive dedupe: ignore repeated subscribe for an already-active tx.
912                if session.has_subscription(&tx_id) {
913                    log::debug!(
914                        "Ignoring duplicate view subscribe client={} tx={} view_id={} item_type={}",
915                        session.client_id,
916                        tx_id,
917                        view_id,
918                        item_type
919                    );
920                    return Ok(());
921                }
922                if let Ok(mut map) = view_ids_by_tx.lock() {
923                    map.insert(tx_id.clone(), view_id.clone());
924                }
925                if let Ok(mut map) = subscribe_started_by_tx.lock() {
926                    map.entry(tx_id.clone()).or_insert_with(Instant::now);
927                }
928
929                log::trace!("View {} for {} (tx: {})", view_id, item_type, tx_id);
930                log::trace!(
931                    "View subscribe request client={} tx={} view_id={} item_type={} window={:?}",
932                    session.client_id,
933                    tx_id,
934                    view_id,
935                    item_type,
936                    wrapped.window
937                );
938
939                let request_context = Arc::new(RequestContext::from_client(
940                    tx_id.clone(),
941                    session.client_id.clone(),
942                    host_id,
943                ));
944
945                if let Some(view_data) = handler_registry.get_view(view_id) {
946                    let parsed = (view_data.parse)(wrapped.view.clone());
947                    match parsed {
948                        Ok(any_view) => {
949                            log::trace!(
950                                "View parsed successfully client={} tx={} view_id={}",
951                                session.client_id,
952                                tx_id,
953                                view_id
954                            );
955                            // NOTE(ts): Spawn cell_factory on blocking pool so it doesn't
956                            // block command processing or other messages.
957                            let cell_factory = view_data.cell_factory;
958                            let registry = registry.clone();
959                            let ctx = ctx.clone();
960                            let window = wrapped.window.clone();
961                            let view_id_clone = view_id.clone();
962                            let sub_tx = subscribe_tx.clone();
963                            let priority_tx = priority_tx.clone();
964                            let drop_logger = drop_logger.clone();
965                            tokio::task::spawn_blocking(move || {
966                                match cell_factory(any_view, registry, request_context, ctx) {
967                                    Ok(filtered_cellmap) => {
968                                        log::trace!(
969                                            "View cell factory succeeded tx={} view_id={}",
970                                            tx_id,
971                                            view_id_clone
972                                        );
973                                        let _ = sub_tx.send(SubscriptionReady::View {
974                                            tx_id,
975                                            view_id: view_id_clone,
976                                            cellmap: filtered_cellmap,
977                                            window,
978                                        });
979                                    }
980                                    Err(e) => {
981                                        log::error!(
982                                            "Failed to create view cell for {}: {}",
983                                            view_id_clone,
984                                            e
985                                        );
986                                        if let Err(err) = priority_tx.try_send(
987                                            MykoMessage::ViewError(ViewError {
988                                                tx: tx_id.to_string(),
989                                                view_id: view_id_clone.to_string(),
990                                                message: e,
991                                            }),
992                                        ) {
993                                            drop_logger.on_drop("ViewError", &err);
994                                        }
995                                    }
996                                }
997                            });
998                        }
999                        Err(e) => {
1000                            let message = format!("Failed to parse view {}: {}", view_id, e);
1001                            log::error!(
1002                                "{} | payload: {}",
1003                                message,
1004                                serde_json::to_string(&wrapped.view).unwrap_or_default()
1005                            );
1006                            if let Err(err) =
1007                                priority_tx.try_send(MykoMessage::ViewError(ViewError {
1008                                    tx: tx_id.to_string(),
1009                                    view_id: view_id.to_string(),
1010                                    message,
1011                                }))
1012                            {
1013                                drop_logger.on_drop("ViewError", &err);
1014                            }
1015                        }
1016                    }
1017                } else {
1018                    let message = format!("No registered handler for view: {}", view_id);
1019                    log::warn!("{}", message);
1020                    if let Err(err) = priority_tx.try_send(MykoMessage::ViewError(ViewError {
1021                        tx: tx_id.to_string(),
1022                        view_id: view_id.to_string(),
1023                        message,
1024                    })) {
1025                        drop_logger.on_drop("ViewError", &err);
1026                    }
1027                }
1028            }
1029
1030            MykoMessage::QueryCancel(CancelSubscription { tx: tx_id }) => {
1031                log::trace!(
1032                    "QueryCancel received: client={} tx={}",
1033                    session.client_id,
1034                    tx_id
1035                );
1036                let tx_id: Arc<str> = tx_id.into();
1037                if let Ok(mut map) = query_ids_by_tx.lock() {
1038                    map.remove(&tx_id);
1039                }
1040                if let Ok(mut map) = subscribe_started_by_tx.lock() {
1041                    map.remove(&tx_id);
1042                }
1043                session.cancel(&tx_id);
1044            }
1045
1046            MykoMessage::QueryWindow(QueryWindowUpdate { tx, window }) => {
1047                let tx_id: Arc<str> = tx.into();
1048                log::trace!(
1049                    "Query window request client={} tx={} has_window={} active_subscriptions={}",
1050                    session.client_id,
1051                    tx_id,
1052                    window.is_some(),
1053                    session.subscription_count()
1054                );
1055                session.update_query_window(&tx_id, window);
1056            }
1057            MykoMessage::ViewCancel(CancelSubscription { tx: tx_id }) => {
1058                log::trace!("View cancel: {}", tx_id);
1059                let tx_id: Arc<str> = tx_id.into();
1060                if let Ok(mut map) = view_ids_by_tx.lock() {
1061                    map.remove(&tx_id);
1062                }
1063                if let Ok(mut map) = subscribe_started_by_tx.lock() {
1064                    map.remove(&tx_id);
1065                }
1066                session.cancel(&tx_id);
1067            }
1068            MykoMessage::ViewWindow(ViewWindowUpdate { tx, window }) => {
1069                let tx_id: Arc<str> = tx.into();
1070                log::trace!("View window update: {}", tx_id);
1071                session.update_view_window(&tx_id, window);
1072            }
1073
1074            MykoMessage::Report(wrapped) => {
1075                // Extract tx from the report JSON
1076                let tx_id: Arc<str> = wrapped
1077                    .report
1078                    .get("tx")
1079                    .and_then(|v| v.as_str())
1080                    .unwrap_or("unknown")
1081                    .into();
1082                let report_id = &wrapped.report_id;
1083
1084                log::trace!(
1085                    "Report subscribe request client={} tx={} report_id={} active_subscriptions_before={}",
1086                    session.client_id,
1087                    tx_id,
1088                    report_id,
1089                    session.subscription_count()
1090                );
1091
1092                // Look up the report registration
1093                if let Some(report_data) = handler_registry.get_report(report_id) {
1094                    // Parse the report JSON to the concrete type
1095                    let parsed = (report_data.parse)(wrapped.report.clone());
1096                    match parsed {
1097                        Ok(any_report) => {
1098                            let request_context = Arc::new(RequestContext::from_client(
1099                                tx_id.clone(),
1100                                session.client_id.clone(),
1101                                host_id,
1102                            ));
1103
1104                            // Create the cell using the factory (with host_id for context)
1105                            match (report_data.cell_factory)(any_report, request_context, ctx) {
1106                                Ok(cell) => {
1107                                    session.subscribe_report(
1108                                        tx_id,
1109                                        report_id.as_str().into(),
1110                                        cell,
1111                                    );
1112                                }
1113                                Err(e) => {
1114                                    log::error!(
1115                                        "Failed to create report cell for {}: {}",
1116                                        report_id,
1117                                        e
1118                                    );
1119                                }
1120                            }
1121                        }
1122                        Err(e) => {
1123                            log::error!(
1124                                "Failed to parse report {}: {} | payload: {}",
1125                                report_id,
1126                                e,
1127                                serde_json::to_string(&wrapped.report).unwrap_or_default()
1128                            );
1129                        }
1130                    }
1131                } else {
1132                    log::warn!("No registered handler for report: {}", report_id);
1133                }
1134            }
1135
1136            MykoMessage::ReportCancel(CancelSubscription { tx: tx_id }) => {
1137                log::trace!(
1138                    "ReportCancel received: client={} tx={} active_subscriptions_before={}",
1139                    session.client_id,
1140                    tx_id,
1141                    session.subscription_count()
1142                );
1143                session.cancel(&tx_id.into());
1144            }
1145
1146            MykoMessage::Event(mut event) => {
1147                record_ws_ingest(std::slice::from_ref(&event));
1148                event.sanitize_null_bytes();
1149                normalize_incoming_event(&mut event, &session.client_id);
1150                if let Err(e) = ctx.apply_event(event) {
1151                    log::error!(
1152                        "Failed to apply event from client {}: {e}",
1153                        session.client_id
1154                    );
1155                }
1156            }
1157
1158            MykoMessage::EventBatch(mut events) => {
1159                record_ws_ingest(&events);
1160                let incoming = events.len();
1161                if incoming >= 64 {
1162                    log::trace!(
1163                        "Received event batch from client {} size={}",
1164                        session.client_id,
1165                        incoming
1166                    );
1167                }
1168                for event in &mut events {
1169                    event.sanitize_null_bytes();
1170                    normalize_incoming_event(event, &session.client_id);
1171                }
1172                match ctx.apply_event_batch(events) {
1173                    Ok(applied) => {
1174                        log::trace!(
1175                            "Applied event batch from client {} size={}",
1176                            session.client_id,
1177                            applied
1178                        );
1179                    }
1180                    Err(e) => {
1181                        log::error!(
1182                            "Failed to apply event batch from client {}: {}",
1183                            session.client_id,
1184                            e
1185                        );
1186                    }
1187                }
1188            }
1189
1190            MykoMessage::Command(wrapped) => {
1191                // Extract tx from the command JSON
1192                let tx_id: Arc<str> = wrapped
1193                    .command
1194                    .get("tx")
1195                    .and_then(|v| v.as_str())
1196                    .unwrap_or("unknown")
1197                    .into();
1198
1199                let command_id = &wrapped.command_id;
1200
1201                log::trace!("Command {} (tx: {})", command_id, tx_id,);
1202                let received_at = Instant::now();
1203                if let Ok(mut map) = command_started_by_tx.lock() {
1204                    map.insert(tx_id.clone(), received_at);
1205                }
1206                if let Err(e) = command_tx.send(CommandJob {
1207                    tx_id: tx_id.clone(),
1208                    command_id: wrapped.command_id.clone(),
1209                    command: wrapped.command.clone(),
1210                    received_at,
1211                }) {
1212                    log::error!(
1213                        "Failed to enqueue command {} for client {} tx={}: {}",
1214                        command_id,
1215                        session.client_id,
1216                        tx_id,
1217                        e
1218                    );
1219                    let error = MykoMessage::CommandError(CommandError {
1220                        tx: tx_id.to_string(),
1221                        command_id: command_id.to_string(),
1222                        message: "Command queue unavailable".to_string(),
1223                    });
1224                    if let Err(err) = priority_tx.try_send(error) {
1225                        drop_logger.on_drop("CommandError", &err);
1226                    }
1227                }
1228            }
1229
1230            MykoMessage::Ping(PingData { id, timestamp }) => {
1231                // Echo back the ping data
1232                let pong = MykoMessage::Ping(PingData { id, timestamp });
1233                if let Err(e) = priority_tx.try_send(pong) {
1234                    drop_logger.on_drop("Ping", &e);
1235                }
1236            }
1237
1238            // Response messages - these shouldn't come from clients.
1239            MykoMessage::QueryResponse(resp) => {
1240                log::warn!(
1241                    "Unexpected client message kind=query_response client={} tx={} seq={} upserts={} deletes={} active_subscriptions={}",
1242                    session.client_id,
1243                    resp.tx,
1244                    resp.sequence,
1245                    resp.upserts.len(),
1246                    resp.deletes.len(),
1247                    session.subscription_count()
1248                );
1249            }
1250            MykoMessage::QueryError(err) => {
1251                log::warn!(
1252                    "Unexpected client message kind=query_error client={} tx={} query_id={} message={} active_subscriptions={}",
1253                    session.client_id,
1254                    err.tx,
1255                    err.query_id,
1256                    err.message,
1257                    session.subscription_count()
1258                );
1259            }
1260            MykoMessage::ViewResponse(resp) => {
1261                log::warn!(
1262                    "Unexpected client message kind=view_response client={} tx={} seq={} upserts={} deletes={} active_subscriptions={}",
1263                    session.client_id,
1264                    resp.tx,
1265                    resp.sequence,
1266                    resp.upserts.len(),
1267                    resp.deletes.len(),
1268                    session.subscription_count()
1269                );
1270            }
1271            MykoMessage::ViewError(err) => {
1272                log::warn!(
1273                    "Unexpected client message kind=view_error client={} tx={} view_id={} message={} active_subscriptions={}",
1274                    session.client_id,
1275                    err.tx,
1276                    err.view_id,
1277                    err.message,
1278                    session.subscription_count()
1279                );
1280            }
1281            MykoMessage::ReportResponse(resp) => {
1282                log::warn!(
1283                    "Unexpected client message kind=report_response client={} tx={} active_subscriptions={}",
1284                    session.client_id,
1285                    resp.tx,
1286                    session.subscription_count()
1287                );
1288            }
1289            MykoMessage::ReportError(err) => {
1290                log::warn!(
1291                    "Unexpected client message kind=report_error client={} tx={} report_id={} message={} active_subscriptions={}",
1292                    session.client_id,
1293                    err.tx,
1294                    err.report_id,
1295                    err.message,
1296                    session.subscription_count()
1297                );
1298            }
1299            MykoMessage::CommandResponse(resp) => {
1300                if resp.tx.trim().is_empty() {
1301                    log::warn!(
1302                        "Malformed client message kind=command_response client={} tx=<empty> active_subscriptions={}",
1303                        session.client_id,
1304                        session.subscription_count()
1305                    );
1306                } else {
1307                    let correlated = outbound_commands_by_tx
1308                        .lock()
1309                        .ok()
1310                        .and_then(|mut map| map.remove(&resp.tx));
1311                    if let Some((command_id, started)) = correlated {
1312                        log::debug!(
1313                            "Client command response matched outbound command client={} tx={} command_id={} roundtrip_ms={} active_subscriptions={}",
1314                            session.client_id,
1315                            resp.tx,
1316                            command_id,
1317                            started.elapsed().as_millis(),
1318                            session.subscription_count()
1319                        );
1320                    } else {
1321                        log::warn!(
1322                            "Client command response without outbound match client={} tx={} active_subscriptions={}",
1323                            session.client_id,
1324                            resp.tx,
1325                            session.subscription_count()
1326                        );
1327                    }
1328                }
1329            }
1330            MykoMessage::CommandError(err) => {
1331                if err.tx.trim().is_empty() {
1332                    log::warn!(
1333                        "Malformed client message kind=command_error client={} tx=<empty> command_id={} message={} active_subscriptions={}",
1334                        session.client_id,
1335                        err.command_id,
1336                        err.message,
1337                        session.subscription_count()
1338                    );
1339                } else {
1340                    let correlated = outbound_commands_by_tx
1341                        .lock()
1342                        .ok()
1343                        .and_then(|mut map| map.remove(&err.tx));
1344                    if let Some((command_id, started)) = correlated {
1345                        log::warn!(
1346                            "Client command error matched outbound command client={} tx={} command_id={} transport_command_id={} message={} roundtrip_ms={} active_subscriptions={}",
1347                            session.client_id,
1348                            err.tx,
1349                            err.command_id,
1350                            command_id,
1351                            err.message,
1352                            started.elapsed().as_millis(),
1353                            session.subscription_count()
1354                        );
1355                    } else {
1356                        log::warn!(
1357                            "Client command error without outbound match client={} tx={} command_id={} message={} active_subscriptions={}",
1358                            session.client_id,
1359                            err.tx,
1360                            err.command_id,
1361                            err.message,
1362                            session.subscription_count()
1363                        );
1364                    }
1365                }
1366            }
1367            MykoMessage::Benchmark(payload) => {
1368                let stats = ws_benchmark_stats();
1369                ensure_ws_benchmark_logger();
1370                stats.message_count.fetch_add(1, Ordering::Relaxed);
1371                // Estimate payload size from the JSON value
1372                let size = payload.to_string().len() as u64;
1373                stats.total_bytes.fetch_add(size, Ordering::Relaxed);
1374            }
1375            MykoMessage::ProtocolSwitch { protocol } => {
1376                log::warn!(
1377                    "Unexpected client message kind=protocol_switch_ack client={} protocol={} active_subscriptions={}",
1378                    session.client_id,
1379                    protocol,
1380                    session.subscription_count()
1381                );
1382            }
1383        }
1384
1385        Ok(())
1386    }
1387
1388    fn execute_command_job(
1389        ctx: Arc<CellServerCtx>,
1390        priority_tx: &mpsc::Sender<MykoMessage>,
1391        drop_logger: &DropLogger,
1392        client_id: Arc<str>,
1393        job: CommandJob,
1394    ) {
1395        let host_id = ctx.host_id;
1396        let started = Instant::now();
1397        let queue_wait_ms = started.duration_since(job.received_at).as_millis();
1398        let command_id = job.command_id.clone();
1399
1400        let mut handler_found = false;
1401        for registration in inventory::iter::<CommandHandlerRegistration> {
1402            if registration.command_id == command_id {
1403                handler_found = true;
1404                let executor = (registration.factory)();
1405
1406                let req = Arc::new(RequestContext::from_client(
1407                    job.tx_id.clone(),
1408                    client_id.clone(),
1409                    host_id,
1410                ));
1411                let cmd_id: Arc<str> = Arc::from(command_id.clone());
1412                let cmd_ctx = CommandContext::new(cmd_id, req, ctx.clone());
1413                let execute_started = Instant::now();
1414
1415                match executor.execute_from_value(job.command.clone(), cmd_ctx) {
1416                    Ok(result) => {
1417                        let response = MykoMessage::CommandResponse(CommandResponse {
1418                            response: result,
1419                            tx: job.tx_id.to_string(),
1420                        });
1421                        if let Err(e) = priority_tx.try_send(response) {
1422                            drop_logger.on_drop("CommandResponse", &e);
1423                        }
1424                    }
1425                    Err(e) => {
1426                        let error = MykoMessage::CommandError(CommandError {
1427                            tx: job.tx_id.to_string(),
1428                            command_id: command_id.clone(),
1429                            message: e.message,
1430                        });
1431                        if let Err(err) = priority_tx.try_send(error) {
1432                            drop_logger.on_drop("CommandError", &err);
1433                        }
1434                    }
1435                }
1436                let execute_ms = execute_started.elapsed().as_millis();
1437                let total_ms = job.received_at.elapsed().as_millis();
1438                log::trace!(
1439                    target: "myko_server::ws_perf",
1440                    "command_exec client={} tx={} command_id={} queue_wait_ms={} execute_ms={} total_ms={}",
1441                    client_id,
1442                    job.tx_id,
1443                    command_id,
1444                    queue_wait_ms,
1445                    execute_ms,
1446                    total_ms
1447                );
1448                break;
1449            }
1450        }
1451
1452        if !handler_found {
1453            log::warn!("No registered handler for command: {}", command_id);
1454            let error = MykoMessage::CommandError(CommandError {
1455                tx: job.tx_id.to_string(),
1456                command_id: command_id.clone(),
1457                message: format!("Command handler not found: {}", command_id),
1458            });
1459            if let Err(e) = priority_tx.try_send(error) {
1460                drop_logger.on_drop("CommandError", &e);
1461            }
1462        }
1463
1464        if !handler_found {
1465            log::debug!(
1466                target: "myko_server::ws_perf",
1467                "command_exec client={} tx={} command_id={} queue_wait_ms={} execute_ms=0 total_ms={} handler_found=false",
1468                client_id,
1469                job.tx_id,
1470                command_id,
1471                queue_wait_ms,
1472                job.received_at.elapsed().as_millis()
1473            );
1474        }
1475    }
1476}
1477
1478/// Channel-based WebSocket writer.
1479///
1480/// Sends messages through an mpsc channel which are then
1481/// forwarded to the actual WebSocket.
1482struct ChannelWriter {
1483    tx: mpsc::Sender<OutboundMessage>,
1484    deferred_tx: mpsc::Sender<DeferredOutbound>,
1485    drop_logger: Arc<DropLogger>,
1486    use_binary_writer: Arc<AtomicBool>,
1487}
1488
1489impl WsWriter for ChannelWriter {
1490    fn send(&self, msg: MykoMessage) {
1491        // Use try_send since we're in sync context
1492        if let Err(e) = self.tx.try_send(OutboundMessage::Message(msg)) {
1493            self.drop_logger.on_drop("message", &e);
1494        }
1495    }
1496
1497    fn protocol(&self) -> myko::client::MykoProtocol {
1498        if self.use_binary_writer.load(Ordering::SeqCst) {
1499            myko::client::MykoProtocol::MSGPACK
1500        } else {
1501            myko::client::MykoProtocol::JSON
1502        }
1503    }
1504
1505    fn send_serialized_command(
1506        &self,
1507        tx: Arc<str>,
1508        command_id: String,
1509        payload: EncodedCommandMessage,
1510    ) {
1511        if let Err(e) = self.tx.try_send(OutboundMessage::SerializedCommand {
1512            tx,
1513            command_id,
1514            payload,
1515        }) {
1516            self.drop_logger.on_drop("serialized_command", &e);
1517        }
1518    }
1519
1520    fn send_report_response(&self, tx: Arc<str>, output: Arc<dyn AnyOutput>) {
1521        if let Err(e) = self
1522            .deferred_tx
1523            .try_send(DeferredOutbound::Report(tx, output))
1524        {
1525            self.drop_logger.on_drop("ReportResponseDeferred", &e);
1526        }
1527    }
1528
1529    fn send_query_response(&self, response: PendingQueryResponse, is_view: bool) {
1530        if let Err(e) = self
1531            .deferred_tx
1532            .try_send(DeferredOutbound::Query { response, is_view })
1533        {
1534            self.drop_logger.on_drop("QueryResponseDeferred", &e);
1535        }
1536    }
1537}
1538
1539#[cfg(test)]
1540mod tests {
1541    use super::*;
1542
1543    #[test]
1544    fn test_channel_writer() {
1545        let (tx, mut rx) = mpsc::channel(10);
1546        let (deferred_tx, _deferred_rx) = mpsc::channel(10);
1547        let drop_logger = Arc::new(DropLogger::new("test-client".into()));
1548        let writer = ChannelWriter {
1549            tx,
1550            deferred_tx,
1551            drop_logger,
1552            use_binary_writer: Arc::new(AtomicBool::new(false)),
1553        };
1554
1555        let msg = MykoMessage::Ping(PingData {
1556            id: "test".to_string(),
1557            timestamp: 0,
1558        });
1559        writer.send(msg);
1560
1561        let received = rx.try_recv().unwrap();
1562        assert!(matches!(
1563            received,
1564            OutboundMessage::Message(MykoMessage::Ping(_))
1565        ));
1566    }
1567}