Skip to main content

myko_server/
ws_handler.rs

1//! WebSocket handler for the cell-based server.
2//!
3//! Handles WebSocket connections using ClientSession for subscription management.
4
5use std::{
6    collections::HashMap,
7    net::SocketAddr,
8    sync::{
9        Arc, Mutex, OnceLock,
10        atomic::{AtomicBool, AtomicU64, Ordering},
11    },
12    thread,
13    time::{Duration, Instant},
14};
15
16use dashmap::DashMap;
17use futures_util::{SinkExt, StreamExt};
18use hyphae::SelectExt;
19use myko::{
20    WS_MAX_FRAME_SIZE_BYTES, WS_MAX_MESSAGE_SIZE_BYTES,
21    command::{CommandContext, CommandHandlerRegistration},
22    entities::client::{Client, ClientId},
23    relationship::{iter_client_id_registrations, iter_fallback_to_id_registrations},
24    report::AnyOutput,
25    request::RequestContext,
26    server::{
27        CellServerCtx, ClientSession, PendingQueryResponse, WsWriter,
28        client_registry::try_client_registry,
29    },
30    wire::{
31        CancelSubscription, CommandError, CommandResponse, EncodedCommandMessage, MEvent,
32        MEventType, MykoMessage, PingData, QueryWindowUpdate, ViewError, ViewWindowUpdate,
33    },
34};
35use tokio::{net::TcpStream, sync::mpsc, time::interval};
36use tokio_tungstenite::{
37    accept_async_with_config,
38    tungstenite::{Message, protocol::WebSocketConfig},
39};
40use uuid::Uuid;
41
42/// Protocol switch message sent by client to enable binary (msgpack) encoding.
43/// Must match ProtocolMessages.SwitchToMSGPACK in TypeScript client.
44const SWITCH_TO_MSGPACK: &str = "myko:switch-to-msgpack";
45
46struct WsIngestStats {
47    counts_by_type: DashMap<Arc<str>, Arc<AtomicU64>>,
48}
49
50struct WsBenchmarkStats {
51    message_count: AtomicU64,
52    total_bytes: AtomicU64,
53}
54
55static WS_INGEST_STATS: OnceLock<Arc<WsIngestStats>> = OnceLock::new();
56static WS_INGEST_LOGGER_STARTED: AtomicBool = AtomicBool::new(false);
57static WS_BENCHMARK_STATS: OnceLock<Arc<WsBenchmarkStats>> = OnceLock::new();
58static WS_BENCHMARK_LOGGER_STARTED: AtomicBool = AtomicBool::new(false);
59
60fn ws_benchmark_stats() -> Arc<WsBenchmarkStats> {
61    WS_BENCHMARK_STATS
62        .get_or_init(|| {
63            Arc::new(WsBenchmarkStats {
64                message_count: AtomicU64::new(0),
65                total_bytes: AtomicU64::new(0),
66            })
67        })
68        .clone()
69}
70
71fn ensure_ws_benchmark_logger() {
72    if WS_BENCHMARK_LOGGER_STARTED
73        .compare_exchange(false, true, Ordering::Relaxed, Ordering::Relaxed)
74        .is_err()
75    {
76        return;
77    }
78
79    let stats = ws_benchmark_stats();
80    thread::Builder::new()
81        .name("ws-benchmark-logger".into())
82        .spawn(move || {
83            loop {
84                thread::sleep(Duration::from_secs(1));
85
86                let count = stats.message_count.swap(0, Ordering::Relaxed);
87                let bytes = stats.total_bytes.swap(0, Ordering::Relaxed);
88
89                if count == 0 {
90                    continue;
91                }
92
93                log::info!(
94                    "WebSocket benchmark last_1s messages={} bytes={} avg_bytes={}",
95                    count,
96                    bytes,
97                    if count > 0 { bytes / count } else { 0 },
98                );
99            }
100        })
101        .expect("failed to spawn websocket benchmark logger thread");
102}
103
104fn ws_ingest_stats() -> Arc<WsIngestStats> {
105    WS_INGEST_STATS
106        .get_or_init(|| {
107            Arc::new(WsIngestStats {
108                counts_by_type: DashMap::new(),
109            })
110        })
111        .clone()
112}
113
114fn ensure_ws_ingest_logger() {
115    if WS_INGEST_LOGGER_STARTED
116        .compare_exchange(false, true, Ordering::Relaxed, Ordering::Relaxed)
117        .is_err()
118    {
119        return;
120    }
121
122    let stats = ws_ingest_stats();
123    thread::Builder::new()
124        .name("ws-ingest-logger".into())
125        .spawn(move || {
126            loop {
127                thread::sleep(Duration::from_secs(1));
128
129                let mut per_type = Vec::new();
130                let mut total = 0u64;
131                for entry in &stats.counts_by_type {
132                    let count = entry.value().swap(0, Ordering::Relaxed);
133                    if count == 0 {
134                        continue;
135                    }
136                    total += count;
137                    per_type.push((entry.key().clone(), count));
138                }
139
140                if total == 0 {
141                    continue;
142                }
143
144                per_type.sort_by(|a, b| b.1.cmp(&a.1).then_with(|| a.0.cmp(&b.0)));
145                let details = per_type
146                    .into_iter()
147                    .map(|(entity_type, count)| format!("{entity_type}={count}"))
148                    .collect::<Vec<_>>()
149                    .join(" ");
150
151                log::info!("WebSocket ingest last_1s total={} {}", total, details);
152            }
153        })
154        .expect("failed to spawn websocket ingest logger thread");
155}
156
157fn record_ws_ingest(events: &[MEvent]) {
158    if events.is_empty() {
159        return;
160    }
161
162    let stats = ws_ingest_stats();
163    for event in events {
164        let counter = stats
165            .counts_by_type
166            .entry(Arc::<str>::from(event.item_type.as_str()))
167            .or_insert_with(|| Arc::new(AtomicU64::new(0)))
168            .clone();
169        counter.fetch_add(1, Ordering::Relaxed);
170    }
171}
172
173fn normalize_incoming_event(event: &mut MEvent, client_id: &str) {
174    if event.change_type != MEventType::SET {
175        return;
176    }
177
178    // Auto-populate #[myko_client_id] fields with the connection's client_id
179    for reg in iter_client_id_registrations() {
180        if reg.entity_type == event.item_type {
181            if let Some(obj) = event.item.as_object_mut() {
182                obj.insert(
183                    reg.field_name_json.to_string(),
184                    serde_json::Value::String(client_id.to_string()),
185                );
186            }
187            break;
188        }
189    }
190
191    // Auto-populate #[fallback_to_id] fields with the entity's own id
192    // if the field is null or missing.
193    if let Some(obj) = event.item.as_object_mut()
194        && let Some(id) = obj.get("id").and_then(|v| v.as_str()).map(String::from)
195    {
196        for reg in iter_fallback_to_id_registrations() {
197            if reg.entity_type == event.item_type {
198                let field = reg.field_name_json;
199                if matches!(obj.get(field), None | Some(serde_json::Value::Null)) {
200                    obj.insert(field.to_string(), serde_json::Value::String(id.clone()));
201                }
202            }
203        }
204    }
205}
206
207/// Per-connection drop tracking to avoid log storms when clients fall behind.
208///
209/// When the outbound channel is full, we will drop messages (same as today),
210/// but we must not `warn!` for every drop or we can effectively DoS ourselves.
211struct DropLogger {
212    client_id: Arc<str>,
213    dropped: std::sync::atomic::AtomicU64,
214    last_log_ms: std::sync::atomic::AtomicU64,
215}
216
217impl DropLogger {
218    fn new(client_id: Arc<str>) -> Self {
219        Self {
220            client_id,
221            dropped: std::sync::atomic::AtomicU64::new(0),
222            last_log_ms: std::sync::atomic::AtomicU64::new(0),
223        }
224    }
225
226    fn on_drop(&self, kind: &'static str, err: &dyn std::fmt::Display) {
227        use std::sync::atomic::Ordering;
228
229        self.dropped.fetch_add(1, Ordering::Relaxed);
230
231        // Log at most once per second per connection.
232        let now_ms = std::time::SystemTime::now()
233            .duration_since(std::time::UNIX_EPOCH)
234            .unwrap_or_default()
235            .as_millis() as u64;
236        let last_ms = self.last_log_ms.load(Ordering::Relaxed);
237        if now_ms.saturating_sub(last_ms) < 1000 {
238            return;
239        }
240
241        if self
242            .last_log_ms
243            .compare_exchange(last_ms, now_ms, Ordering::Relaxed, Ordering::Relaxed)
244            .is_err()
245        {
246            return;
247        }
248
249        let n = self.dropped.swap(0, Ordering::Relaxed);
250        log::warn!(
251            "WebSocket send buffer full; dropped {} message(s) for client {} (latest: {}): {}",
252            n,
253            self.client_id,
254            kind,
255            err
256        );
257    }
258}
259
260struct CommandJob {
261    tx_id: Arc<str>,
262    command_id: String,
263    command: serde_json::Value,
264    received_at: Instant,
265}
266
267/// Result of an async subscription build (query or view cell_factory).
268enum SubscriptionReady {
269    Query {
270        tx_id: Arc<str>,
271        cellmap: hyphae::CellMap<Arc<str>, Arc<dyn myko::item::AnyItem>, hyphae::CellImmutable>,
272        window: Option<myko::wire::QueryWindow>,
273    },
274    View {
275        tx_id: Arc<str>,
276        view_id: Arc<str>,
277        cellmap: hyphae::CellMap<Arc<str>, Arc<dyn myko::item::AnyItem>, hyphae::CellImmutable>,
278        window: Option<myko::wire::QueryWindow>,
279    },
280}
281
282enum OutboundMessage {
283    Message(MykoMessage),
284    SerializedCommand {
285        tx: Arc<str>,
286        command_id: String,
287        payload: EncodedCommandMessage,
288    },
289}
290
291enum DeferredOutbound {
292    Report(Arc<str>, Arc<dyn AnyOutput>),
293    Query {
294        response: PendingQueryResponse,
295        is_view: bool,
296    },
297}
298
299/// WebSocket handler for a single client connection.
300pub struct WsHandler;
301
302impl WsHandler {
303    /// Handle a new WebSocket connection.
304    #[allow(clippy::too_many_arguments)]
305    pub async fn handle_connection(
306        stream: TcpStream,
307        addr: SocketAddr,
308        ctx: Arc<CellServerCtx>,
309    ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
310        ensure_ws_ingest_logger();
311
312        let host_id = ctx.host_id;
313
314        let mut ws_config = WebSocketConfig::default();
315        ws_config.max_message_size = Some(WS_MAX_MESSAGE_SIZE_BYTES);
316        ws_config.max_frame_size = Some(WS_MAX_FRAME_SIZE_BYTES);
317        let ws_stream = accept_async_with_config(stream, Some(ws_config)).await?;
318        let (mut write, mut read) = ws_stream.split();
319
320        // Create a bounded channel for sending messages to the client
321        // High limit (10k) since we have good memory availability
322        let (tx, mut rx) = mpsc::channel::<OutboundMessage>(10_000);
323        let (deferred_tx, mut deferred_rx) = mpsc::channel::<DeferredOutbound>(10_000);
324        let (priority_tx, mut priority_rx) = mpsc::channel::<MykoMessage>(1_000);
325        let (command_tx, mut command_rx) = mpsc::unbounded_channel::<CommandJob>();
326        let (subscribe_tx, mut subscribe_rx) = mpsc::unbounded_channel::<SubscriptionReady>();
327
328        // Protocol: default to JSON, switch to binary only if client opts in
329        let use_binary = Arc::new(AtomicBool::new(false));
330
331        // Create client session with channel-based writer
332        let client_id: Arc<str> = Uuid::new_v4().to_string().into();
333        let drop_logger = Arc::new(DropLogger::new(client_id.clone()));
334        let writer = ChannelWriter {
335            tx: tx.clone(),
336            deferred_tx: deferred_tx.clone(),
337            drop_logger: drop_logger.clone(),
338            use_binary_writer: use_binary.clone(),
339        };
340
341        // Register writer in the global client registry (if initialized)
342        let writer_arc: Arc<dyn WsWriter> = Arc::new(ChannelWriter {
343            tx: tx.clone(),
344            deferred_tx: deferred_tx.clone(),
345            drop_logger: drop_logger.clone(),
346            use_binary_writer: use_binary.clone(),
347        });
348        if let Some(registry) = try_client_registry() {
349            registry.register(client_id.clone(), writer_arc);
350        }
351
352        let mut session = ClientSession::new(client_id.clone(), writer);
353
354        let use_binary_writer = use_binary.clone();
355        let query_ids_by_tx: Arc<Mutex<HashMap<Arc<str>, Arc<str>>>> =
356            Arc::new(Mutex::new(HashMap::new()));
357        let view_ids_by_tx: Arc<Mutex<HashMap<Arc<str>, Arc<str>>>> =
358            Arc::new(Mutex::new(HashMap::new()));
359        let subscribe_started_by_tx: Arc<Mutex<HashMap<Arc<str>, Instant>>> =
360            Arc::new(Mutex::new(HashMap::new()));
361        let command_started_by_tx: Arc<Mutex<HashMap<Arc<str>, Instant>>> =
362            Arc::new(Mutex::new(HashMap::new()));
363        let outbound_commands_by_tx: Arc<Mutex<HashMap<String, (String, Instant)>>> =
364            Arc::new(Mutex::new(HashMap::new()));
365
366        let outbound_commands_by_tx_writer = outbound_commands_by_tx.clone();
367
368        // Publish Client entity
369        let client_entity = Client {
370            id: ClientId(client_id.clone()),
371            server_id: host_id.to_string().into(),
372            windback: None,
373        };
374        if let Err(e) = ctx.set(&client_entity) {
375            log::error!("Failed to persist client entity: {e}");
376        }
377
378        log::info!("Client connected: {} from {}", client_id, addr);
379
380        let write_ctx = ctx.clone();
381        let write_client_id = client_id.clone();
382        let write_addr = addr;
383        let command_ctx = ctx.clone();
384        let command_priority_tx = priority_tx.clone();
385        let command_drop_logger = drop_logger.clone();
386        let command_client_id = client_id.clone();
387
388        // Spawn task to forward messages from channel to WebSocket
389        let write_task = tokio::spawn(async move {
390            let _ctx = write_ctx;
391            let mut normal_open = true;
392            let mut priority_open = true;
393            let mut deferred_open = true;
394            while normal_open || priority_open || deferred_open {
395                let msg = tokio::select! {
396                    biased;
397                    maybe = priority_rx.recv(), if priority_open => {
398                        match maybe {
399                            Some(msg) => OutboundMessage::Message(msg),
400                            None => {
401                                priority_open = false;
402                                continue;
403                            }
404                        }
405                    }
406                    maybe = deferred_rx.recv(), if deferred_open => {
407                        match maybe {
408                            Some(DeferredOutbound::Report(tx, output)) => {
409                                OutboundMessage::Message(MykoMessage::ReportResponse(myko::wire::ReportResponse {
410                                    response: output.to_value(),
411                                    tx: tx.to_string(),
412                                }))
413                            }
414                            Some(DeferredOutbound::Query { response, is_view }) => {
415                                if is_view {
416                                    OutboundMessage::Message(MykoMessage::ViewResponse(response.into_wire()))
417                                } else {
418                                    OutboundMessage::Message(MykoMessage::QueryResponse(response.into_wire()))
419                                }
420                            }
421                            None => {
422                                deferred_open = false;
423                                continue;
424                            }
425                        }
426                    }
427                    maybe = rx.recv(), if normal_open => {
428                        match maybe {
429                            Some(msg) => msg,
430                            None => {
431                                normal_open = false;
432                                continue;
433                            }
434                        }
435                    }
436                };
437                let (kind, tx_id, seq, _upserts, _deletes, _total_count) = match &msg {
438                    OutboundMessage::SerializedCommand { tx, .. } => {
439                        ("command", Some(tx.clone()), None, None, None, None)
440                    }
441                    OutboundMessage::Message(msg) => match msg {
442                        MykoMessage::ViewResponse(r) => (
443                            "view_response",
444                            Some(r.tx.clone()),
445                            Some(r.sequence),
446                            Some(r.upserts.len()),
447                            Some(r.deletes.len()),
448                            r.total_count,
449                        ),
450                        MykoMessage::QueryResponse(r) => (
451                            "query_response",
452                            Some(r.tx.clone()),
453                            Some(r.sequence),
454                            Some(r.upserts.len()),
455                            Some(r.deletes.len()),
456                            r.total_count,
457                        ),
458                        MykoMessage::CommandResponse(r) => (
459                            "command_response",
460                            Some(Arc::<str>::from(r.tx.clone())),
461                            None,
462                            None,
463                            None,
464                            None,
465                        ),
466                        MykoMessage::CommandError(r) => (
467                            "command_error",
468                            Some(Arc::<str>::from(r.tx.clone())),
469                            None,
470                            None,
471                            None,
472                            None,
473                        ),
474                        _ => ("other", None, None, None, None, None),
475                    },
476                };
477
478                match &msg {
479                    OutboundMessage::SerializedCommand { tx, command_id, .. } => {
480                        if !tx.trim().is_empty()
481                            && let Ok(mut map) = outbound_commands_by_tx_writer.lock()
482                        {
483                            map.insert(tx.to_string(), (command_id.clone(), Instant::now()));
484                        }
485                    }
486                    OutboundMessage::Message(MykoMessage::Command(wrapped)) => {
487                        if let Some(tx_id) = wrapped.command.get("tx").and_then(|v| v.as_str())
488                            && !tx_id.trim().is_empty()
489                            && let Ok(mut map) = outbound_commands_by_tx_writer.lock()
490                        {
491                            map.insert(
492                                tx_id.to_string(),
493                                (wrapped.command_id.clone(), Instant::now()),
494                            );
495                        }
496                    }
497                    _ => {}
498                }
499
500                let ws_msg = match &msg {
501                    OutboundMessage::SerializedCommand {
502                        payload: EncodedCommandMessage::Json(json),
503                        ..
504                    } => Message::Text(json.clone().into()),
505                    OutboundMessage::SerializedCommand {
506                        payload: EncodedCommandMessage::Msgpack(bytes),
507                        ..
508                    } => Message::Binary(bytes.clone().into()),
509                    OutboundMessage::Message(msg) if use_binary_writer.load(Ordering::SeqCst) => {
510                        match rmp_serde::to_vec(msg) {
511                            Ok(bytes) => Message::Binary(bytes.into()),
512                            Err(e) => {
513                                log::error!("Failed to serialize message to msgpack: {}", e);
514                                continue;
515                            }
516                        }
517                    }
518                    OutboundMessage::Message(msg) => match serde_json::to_string(msg) {
519                        Ok(json) => Message::Text(json.into()),
520                        Err(e) => {
521                            log::error!("Failed to serialize message to JSON: {}", e);
522                            continue;
523                        }
524                    },
525                };
526                let payload_bytes = match &ws_msg {
527                    Message::Binary(b) => b.len(),
528                    Message::Text(t) => t.len(),
529                    _ => 0,
530                };
531
532                if let Err(err) = write.send(ws_msg).await {
533                    log::error!(
534                        "WebSocket write failed for client {} from {} kind={} tx={:?} seq={:?} payload_bytes={} binary={}: {}",
535                        write_client_id,
536                        write_addr,
537                        kind,
538                        tx_id,
539                        seq,
540                        payload_bytes,
541                        use_binary_writer.load(Ordering::SeqCst),
542                        err
543                    );
544                    break;
545                }
546            }
547            log::warn!(
548                "WebSocket writer task exiting for client {} from {} normal_open={} priority_open={} deferred_open={}",
549                write_client_id,
550                write_addr,
551                normal_open,
552                priority_open,
553                deferred_open
554            );
555        });
556
557        // Execute commands on a dedicated worker so ping/cancel traffic is never
558        // blocked by long-running command handlers.
559        let command_started_cleanup = command_started_by_tx.clone();
560        let command_task = tokio::spawn(async move {
561            while let Some(job) = command_rx.recv().await {
562                let command_ctx = command_ctx.clone();
563                let command_priority_tx = command_priority_tx.clone();
564                let command_drop_logger = command_drop_logger.clone();
565                let command_client_id = command_client_id.clone();
566                let tx_id = job.tx_id.clone();
567                let started_map = command_started_cleanup.clone();
568                match tokio::task::spawn_blocking(move || {
569                    Self::execute_command_job(
570                        command_ctx,
571                        &command_priority_tx,
572                        command_drop_logger.as_ref(),
573                        command_client_id,
574                        job,
575                    );
576                })
577                .await
578                {
579                    Ok(()) => {}
580                    Err(e) => {
581                        log::error!("Command worker panicked: {}", e);
582                    }
583                }
584                // NOTE(ts): Clean up timing entry after command completes (success or panic).
585                if let Ok(mut map) = started_map.lock() {
586                    map.remove(&tx_id);
587                }
588            }
589        });
590
591        // Process incoming messages and completed subscription builds concurrently.
592        // NOTE(ts): View/query cell_factory calls are spawned on the blocking thread pool
593        // so they don't block command processing or other messages.
594        let mut outbound_ttl_interval = interval(Duration::from_secs(10));
595        outbound_ttl_interval.tick().await; // NOTE(ts): consume the immediate first tick
596        loop {
597            tokio::select! {
598                // Completed subscription builds — register with session
599                Some(ready) = subscribe_rx.recv() => {
600                    let tx_id = match &ready {
601                        SubscriptionReady::Query { tx_id, .. } => tx_id.clone(),
602                        SubscriptionReady::View { tx_id, .. } => tx_id.clone(),
603                    };
604                    if let Ok(mut map) = subscribe_started_by_tx.lock() {
605                        map.remove(&tx_id);
606                    }
607                    match ready {
608                        SubscriptionReady::Query { tx_id, cellmap, window } => {
609                            session.subscribe_query(tx_id, cellmap, window);
610                        }
611                        SubscriptionReady::View { tx_id, view_id, cellmap, window } => {
612                            session.subscribe_view_with_id(tx_id, view_id, cellmap, window);
613                        }
614                    }
615                }
616                // NOTE(ts): Sweep outbound command entries older than 10s.
617                // Responses normally arrive quickly; stale entries are from
618                // dropped connections or commands that will never get a response.
619                _ = outbound_ttl_interval.tick() => {
620                    if let Ok(mut map) = outbound_commands_by_tx.lock() {
621                        let before = map.len();
622                        map.retain(|_, (_, started)| started.elapsed() < Duration::from_secs(10));
623                        let removed = before - map.len();
624                        if removed > 0 {
625                            log::debug!(
626                                "Outbound command TTL sweep client={}: removed {} stale entries, {} remaining",
627                                session.client_id,
628                                removed,
629                                map.len()
630                            );
631                        }
632                    }
633                }
634                // Incoming WebSocket messages
635                msg = read.next() => {
636                    let Some(msg) = msg else { break };
637                    let ctx = ctx.clone();
638                    let msg = match msg {
639                        Ok(m) => m,
640                        Err(e) => {
641                            log::error!("WebSocket read error from {}: {}", client_id, e);
642                            break;
643                        }
644                    };
645
646                    match msg {
647                        Message::Binary(data) => {
648                            if !use_binary.load(Ordering::SeqCst) {
649                                log::debug!(
650                                    "Client {} switched to binary (msgpack) protocol via auto-detect",
651                                    client_id
652                                );
653                                use_binary.store(true, Ordering::SeqCst);
654                            }
655
656                            match rmp_serde::from_slice::<MykoMessage>(&data) {
657                                Ok(myko_msg) => {
658                                    if let Err(e) = Self::handle_message(
659                                        &mut session,
660                                        ctx,
661                                        &priority_tx,
662                                        &drop_logger,
663                                        &query_ids_by_tx,
664                                        &view_ids_by_tx,
665                                        &subscribe_started_by_tx,
666                                        &command_started_by_tx,
667                                        &outbound_commands_by_tx,
668                                        &command_tx,
669                                        &subscribe_tx,
670                                        myko_msg,
671                                    ) {
672                                        log::error!("Error handling message: {}", e);
673                                    }
674                                    tokio::task::yield_now().await;
675                                }
676                                Err(e) => {
677                                    log::warn!("Failed to parse message from {}: {}", client_id, e);
678                                }
679                            }
680                        }
681                        Message::Text(text) => {
682                            if text == SWITCH_TO_MSGPACK {
683                                log::debug!(
684                                    "Client {} switched to binary (msgpack) protocol via explicit request",
685                                    client_id
686                                );
687                                if let Err(e) = priority_tx.try_send(MykoMessage::ProtocolSwitch {
688                                    protocol: "msgpack".into(),
689                                }) {
690                                    drop_logger.on_drop("ProtocolSwitch", &e);
691                                }
692                                use_binary.store(true, Ordering::SeqCst);
693                                continue;
694                            }
695
696                            match serde_json::from_str::<MykoMessage>(&text) {
697                                Ok(myko_msg) => {
698                                    if let Err(e) = Self::handle_message(
699                                        &mut session,
700                                        ctx,
701                                        &priority_tx,
702                                        &drop_logger,
703                                        &query_ids_by_tx,
704                                        &view_ids_by_tx,
705                                        &subscribe_started_by_tx,
706                                        &command_started_by_tx,
707                                        &outbound_commands_by_tx,
708                                        &command_tx,
709                                        &subscribe_tx,
710                                        myko_msg,
711                                    ) {
712                                        log::error!("Error handling message: {}", e);
713                                    }
714                                    tokio::task::yield_now().await;
715                                }
716                                Err(e) => {
717                                    log::warn!(
718                                        "Failed to parse JSON message from {}: {} | raw: {}",
719                                        client_id,
720                                        e,
721                                        if text.len() > 1000 {
722                                            &text[..1000]
723                                        } else {
724                                            &text
725                                        }
726                                    );
727                                }
728                            }
729                        }
730                        Message::Ping(data) => {
731                            log::trace!("Ping from {}", client_id);
732                            let _ = data;
733                        }
734                        Message::Pong(_) => {
735                            log::trace!("Pong from {}", client_id);
736                        }
737                        Message::Close(frame) => {
738                            log::warn!("Client {} sent close frame: {:?}", client_id, frame);
739                            break;
740                        }
741                        Message::Frame(_) => {}
742                    }
743                }
744            }
745        }
746
747        // Cleanup
748        drop(session); // Drops all subscription guards
749        write_task.abort();
750        command_task.abort();
751
752        // Unregister from client registry
753        if let Some(registry) = try_client_registry() {
754            registry.unregister(&client_id);
755        }
756
757        // Delete Client entity
758        if let Err(e) = ctx.del(&client_entity) {
759            log::error!("Failed to delete client entity: {e}");
760        }
761
762        log::info!("Client disconnected: {}", client_id);
763
764        Ok(())
765    }
766
767    /// Handle a parsed MykoMessage.
768    #[allow(clippy::too_many_arguments)]
769    fn handle_message<W: WsWriter>(
770        session: &mut ClientSession<W>,
771        ctx: Arc<CellServerCtx>,
772        priority_tx: &mpsc::Sender<MykoMessage>,
773        drop_logger: &Arc<DropLogger>,
774        query_ids_by_tx: &Arc<Mutex<HashMap<Arc<str>, Arc<str>>>>,
775        view_ids_by_tx: &Arc<Mutex<HashMap<Arc<str>, Arc<str>>>>,
776        subscribe_started_by_tx: &Arc<Mutex<HashMap<Arc<str>, Instant>>>,
777        command_started_by_tx: &Arc<Mutex<HashMap<Arc<str>, Instant>>>,
778        outbound_commands_by_tx: &Arc<Mutex<HashMap<String, (String, Instant)>>>,
779        command_tx: &mpsc::UnboundedSender<CommandJob>,
780        subscribe_tx: &mpsc::UnboundedSender<SubscriptionReady>,
781        msg: MykoMessage,
782    ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
783        let handler_registry = ctx.handler_registry.clone();
784
785        let registry = ctx.registry.clone();
786
787        let host_id = ctx.host_id;
788
789        match msg {
790            MykoMessage::Query(wrapped) => {
791                // Extract tx from the query JSON
792                let tx_id: Arc<str> = wrapped
793                    .query
794                    .get("tx")
795                    .and_then(|v| v.as_str())
796                    .unwrap_or("unknown")
797                    .into();
798                let query_id = &wrapped.query_id;
799                let entity_type = &wrapped.query_item_type;
800
801                // Defensive dedupe: some clients can replay the same subscribe request.
802                // A duplicate for the same tx would reset server-side sequence/window state.
803                if session.has_subscription(&tx_id) {
804                    log::debug!(
805                        "Ignoring duplicate query subscribe client={} tx={} query_id={} item_type={}",
806                        session.client_id,
807                        tx_id,
808                        query_id,
809                        entity_type
810                    );
811                    return Ok(());
812                }
813                if let Ok(mut map) = query_ids_by_tx.lock() {
814                    map.insert(tx_id.clone(), query_id.clone());
815                }
816                if let Ok(mut map) = subscribe_started_by_tx.lock() {
817                    map.entry(tx_id.clone()).or_insert_with(Instant::now);
818                }
819
820                log::trace!("Query {} for {} (tx: {})", query_id, entity_type, tx_id);
821                log::trace!(
822                    "Query subscribe request client={} tx={} query_id={} item_type={} window={} active_subscriptions_before={}",
823                    session.client_id,
824                    tx_id,
825                    query_id,
826                    entity_type,
827                    wrapped.window.is_some(),
828                    session.subscription_count()
829                );
830
831                let request_context = Arc::new(RequestContext::from_client(
832                    tx_id.clone(),
833                    session.client_id.clone(),
834                    host_id,
835                ));
836
837                if let Some(query_data) = handler_registry.get_query(query_id) {
838                    let parsed = (query_data.parse)(wrapped.query.clone());
839                    match parsed {
840                        Ok(any_query) => {
841                            // NOTE(ts): Spawn cell_factory on blocking pool so it doesn't
842                            // block command processing or other messages.
843                            let cell_factory = query_data.cell_factory;
844                            let registry = registry.clone();
845                            let request_context = request_context.clone();
846                            let ctx = ctx.clone();
847                            let window = wrapped.window.clone();
848                            let query_id = query_id.clone();
849                            let sub_tx = subscribe_tx.clone();
850                            tokio::task::spawn_blocking(move || {
851                                match cell_factory(any_query, registry, request_context, Some(ctx))
852                                {
853                                    Ok(filtered_cellmap) => {
854                                        let _ = sub_tx.send(SubscriptionReady::Query {
855                                            tx_id,
856                                            cellmap: filtered_cellmap,
857                                            window,
858                                        });
859                                    }
860                                    Err(e) => {
861                                        log::error!(
862                                            "Failed to create query cell for {}: {}",
863                                            query_id,
864                                            e
865                                        );
866                                    }
867                                }
868                            });
869                        }
870                        Err(e) => {
871                            log::error!(
872                                "Failed to parse query {}: {} | payload: {}",
873                                query_id,
874                                e,
875                                serde_json::to_string(&wrapped.query).unwrap_or_default()
876                            );
877                        }
878                    }
879                } else {
880                    // Fall back to select all for unknown queries
881                    log::warn!(
882                        "No registered query handler for {}, falling back to select all",
883                        query_id
884                    );
885                    let cellmap = registry.get_or_create(entity_type).select(|_| true);
886                    session.subscribe_query(tx_id, cellmap, wrapped.window.clone());
887                }
888            }
889
890            MykoMessage::View(wrapped) => {
891                let tx_id: Arc<str> = wrapped
892                    .view
893                    .get("tx")
894                    .and_then(|v| v.as_str())
895                    .unwrap_or("unknown")
896                    .into();
897                let view_id = &wrapped.view_id;
898                let item_type = &wrapped.view_item_type;
899
900                // Defensive dedupe: ignore repeated subscribe for an already-active tx.
901                if session.has_subscription(&tx_id) {
902                    log::debug!(
903                        "Ignoring duplicate view subscribe client={} tx={} view_id={} item_type={}",
904                        session.client_id,
905                        tx_id,
906                        view_id,
907                        item_type
908                    );
909                    return Ok(());
910                }
911                if let Ok(mut map) = view_ids_by_tx.lock() {
912                    map.insert(tx_id.clone(), view_id.clone());
913                }
914                if let Ok(mut map) = subscribe_started_by_tx.lock() {
915                    map.entry(tx_id.clone()).or_insert_with(Instant::now);
916                }
917
918                log::trace!("View {} for {} (tx: {})", view_id, item_type, tx_id);
919                log::trace!(
920                    "View subscribe request client={} tx={} view_id={} item_type={} window={:?}",
921                    session.client_id,
922                    tx_id,
923                    view_id,
924                    item_type,
925                    wrapped.window
926                );
927
928                let request_context = Arc::new(RequestContext::from_client(
929                    tx_id.clone(),
930                    session.client_id.clone(),
931                    host_id,
932                ));
933
934                if let Some(view_data) = handler_registry.get_view(view_id) {
935                    let parsed = (view_data.parse)(wrapped.view.clone());
936                    match parsed {
937                        Ok(any_view) => {
938                            log::trace!(
939                                "View parsed successfully client={} tx={} view_id={}",
940                                session.client_id,
941                                tx_id,
942                                view_id
943                            );
944                            // NOTE(ts): Spawn cell_factory on blocking pool so it doesn't
945                            // block command processing or other messages.
946                            let cell_factory = view_data.cell_factory;
947                            let registry = registry.clone();
948                            let ctx = ctx.clone();
949                            let window = wrapped.window.clone();
950                            let view_id_clone = view_id.clone();
951                            let sub_tx = subscribe_tx.clone();
952                            let priority_tx = priority_tx.clone();
953                            let drop_logger = drop_logger.clone();
954                            tokio::task::spawn_blocking(move || {
955                                match cell_factory(any_view, registry, request_context, ctx) {
956                                    Ok(filtered_cellmap) => {
957                                        log::trace!(
958                                            "View cell factory succeeded tx={} view_id={}",
959                                            tx_id,
960                                            view_id_clone
961                                        );
962                                        let _ = sub_tx.send(SubscriptionReady::View {
963                                            tx_id,
964                                            view_id: view_id_clone,
965                                            cellmap: filtered_cellmap,
966                                            window,
967                                        });
968                                    }
969                                    Err(e) => {
970                                        log::error!(
971                                            "Failed to create view cell for {}: {}",
972                                            view_id_clone,
973                                            e
974                                        );
975                                        if let Err(err) = priority_tx.try_send(
976                                            MykoMessage::ViewError(ViewError {
977                                                tx: tx_id.to_string(),
978                                                view_id: view_id_clone.to_string(),
979                                                message: e,
980                                            }),
981                                        ) {
982                                            drop_logger.on_drop("ViewError", &err);
983                                        }
984                                    }
985                                }
986                            });
987                        }
988                        Err(e) => {
989                            let message = format!("Failed to parse view {}: {}", view_id, e);
990                            log::error!(
991                                "{} | payload: {}",
992                                message,
993                                serde_json::to_string(&wrapped.view).unwrap_or_default()
994                            );
995                            if let Err(err) =
996                                priority_tx.try_send(MykoMessage::ViewError(ViewError {
997                                    tx: tx_id.to_string(),
998                                    view_id: view_id.to_string(),
999                                    message,
1000                                }))
1001                            {
1002                                drop_logger.on_drop("ViewError", &err);
1003                            }
1004                        }
1005                    }
1006                } else {
1007                    let message = format!("No registered handler for view: {}", view_id);
1008                    log::warn!("{}", message);
1009                    if let Err(err) = priority_tx.try_send(MykoMessage::ViewError(ViewError {
1010                        tx: tx_id.to_string(),
1011                        view_id: view_id.to_string(),
1012                        message,
1013                    })) {
1014                        drop_logger.on_drop("ViewError", &err);
1015                    }
1016                }
1017            }
1018
1019            MykoMessage::QueryCancel(CancelSubscription { tx: tx_id }) => {
1020                log::trace!(
1021                    "QueryCancel received: client={} tx={}",
1022                    session.client_id,
1023                    tx_id
1024                );
1025                let tx_id: Arc<str> = tx_id.into();
1026                if let Ok(mut map) = query_ids_by_tx.lock() {
1027                    map.remove(&tx_id);
1028                }
1029                if let Ok(mut map) = subscribe_started_by_tx.lock() {
1030                    map.remove(&tx_id);
1031                }
1032                session.cancel(&tx_id);
1033            }
1034
1035            MykoMessage::QueryWindow(QueryWindowUpdate { tx, window }) => {
1036                let tx_id: Arc<str> = tx.into();
1037                log::trace!(
1038                    "Query window request client={} tx={} has_window={} active_subscriptions={}",
1039                    session.client_id,
1040                    tx_id,
1041                    window.is_some(),
1042                    session.subscription_count()
1043                );
1044                session.update_query_window(&tx_id, window);
1045            }
1046            MykoMessage::ViewCancel(CancelSubscription { tx: tx_id }) => {
1047                log::trace!("View cancel: {}", tx_id);
1048                let tx_id: Arc<str> = tx_id.into();
1049                if let Ok(mut map) = view_ids_by_tx.lock() {
1050                    map.remove(&tx_id);
1051                }
1052                if let Ok(mut map) = subscribe_started_by_tx.lock() {
1053                    map.remove(&tx_id);
1054                }
1055                session.cancel(&tx_id);
1056            }
1057            MykoMessage::ViewWindow(ViewWindowUpdate { tx, window }) => {
1058                let tx_id: Arc<str> = tx.into();
1059                log::trace!("View window update: {}", tx_id);
1060                session.update_view_window(&tx_id, window);
1061            }
1062
1063            MykoMessage::Report(wrapped) => {
1064                // Extract tx from the report JSON
1065                let tx_id: Arc<str> = wrapped
1066                    .report
1067                    .get("tx")
1068                    .and_then(|v| v.as_str())
1069                    .unwrap_or("unknown")
1070                    .into();
1071                let report_id = &wrapped.report_id;
1072
1073                log::trace!(
1074                    "Report subscribe request client={} tx={} report_id={} active_subscriptions_before={}",
1075                    session.client_id,
1076                    tx_id,
1077                    report_id,
1078                    session.subscription_count()
1079                );
1080
1081                // Look up the report registration
1082                if let Some(report_data) = handler_registry.get_report(report_id) {
1083                    // Parse the report JSON to the concrete type
1084                    let parsed = (report_data.parse)(wrapped.report.clone());
1085                    match parsed {
1086                        Ok(any_report) => {
1087                            let request_context = Arc::new(RequestContext::from_client(
1088                                tx_id.clone(),
1089                                session.client_id.clone(),
1090                                host_id,
1091                            ));
1092
1093                            // Create the cell using the factory (with host_id for context)
1094                            match (report_data.cell_factory)(any_report, request_context, ctx) {
1095                                Ok(cell) => {
1096                                    session.subscribe_report(
1097                                        tx_id,
1098                                        report_id.as_str().into(),
1099                                        cell,
1100                                    );
1101                                }
1102                                Err(e) => {
1103                                    log::error!(
1104                                        "Failed to create report cell for {}: {}",
1105                                        report_id,
1106                                        e
1107                                    );
1108                                }
1109                            }
1110                        }
1111                        Err(e) => {
1112                            log::error!(
1113                                "Failed to parse report {}: {} | payload: {}",
1114                                report_id,
1115                                e,
1116                                serde_json::to_string(&wrapped.report).unwrap_or_default()
1117                            );
1118                        }
1119                    }
1120                } else {
1121                    log::warn!("No registered handler for report: {}", report_id);
1122                }
1123            }
1124
1125            MykoMessage::ReportCancel(CancelSubscription { tx: tx_id }) => {
1126                log::trace!(
1127                    "ReportCancel received: client={} tx={} active_subscriptions_before={}",
1128                    session.client_id,
1129                    tx_id,
1130                    session.subscription_count()
1131                );
1132                session.cancel(&tx_id.into());
1133            }
1134
1135            MykoMessage::Event(mut event) => {
1136                record_ws_ingest(std::slice::from_ref(&event));
1137                normalize_incoming_event(&mut event, &session.client_id);
1138                if let Err(e) = ctx.apply_event(event) {
1139                    log::error!(
1140                        "Failed to apply event from client {}: {e}",
1141                        session.client_id
1142                    );
1143                }
1144            }
1145
1146            MykoMessage::EventBatch(mut events) => {
1147                record_ws_ingest(&events);
1148                let incoming = events.len();
1149                if incoming >= 64 {
1150                    log::trace!(
1151                        "Received event batch from client {} size={}",
1152                        session.client_id,
1153                        incoming
1154                    );
1155                }
1156                for event in &mut events {
1157                    normalize_incoming_event(event, &session.client_id);
1158                }
1159                match ctx.apply_event_batch(events) {
1160                    Ok(applied) => {
1161                        log::trace!(
1162                            "Applied event batch from client {} size={}",
1163                            session.client_id,
1164                            applied
1165                        );
1166                    }
1167                    Err(e) => {
1168                        log::error!(
1169                            "Failed to apply event batch from client {}: {}",
1170                            session.client_id,
1171                            e
1172                        );
1173                    }
1174                }
1175            }
1176
1177            MykoMessage::Command(wrapped) => {
1178                // Extract tx from the command JSON
1179                let tx_id: Arc<str> = wrapped
1180                    .command
1181                    .get("tx")
1182                    .and_then(|v| v.as_str())
1183                    .unwrap_or("unknown")
1184                    .into();
1185
1186                let command_id = &wrapped.command_id;
1187
1188                log::trace!("Command {} (tx: {})", command_id, tx_id,);
1189                let received_at = Instant::now();
1190                if let Ok(mut map) = command_started_by_tx.lock() {
1191                    map.insert(tx_id.clone(), received_at);
1192                }
1193                if let Err(e) = command_tx.send(CommandJob {
1194                    tx_id: tx_id.clone(),
1195                    command_id: wrapped.command_id.clone(),
1196                    command: wrapped.command.clone(),
1197                    received_at,
1198                }) {
1199                    log::error!(
1200                        "Failed to enqueue command {} for client {} tx={}: {}",
1201                        command_id,
1202                        session.client_id,
1203                        tx_id,
1204                        e
1205                    );
1206                    let error = MykoMessage::CommandError(CommandError {
1207                        tx: tx_id.to_string(),
1208                        command_id: command_id.to_string(),
1209                        message: "Command queue unavailable".to_string(),
1210                    });
1211                    if let Err(err) = priority_tx.try_send(error) {
1212                        drop_logger.on_drop("CommandError", &err);
1213                    }
1214                }
1215            }
1216
1217            MykoMessage::Ping(PingData { id, timestamp }) => {
1218                // Echo back the ping data
1219                let pong = MykoMessage::Ping(PingData { id, timestamp });
1220                if let Err(e) = priority_tx.try_send(pong) {
1221                    drop_logger.on_drop("Ping", &e);
1222                }
1223            }
1224
1225            // Response messages - these shouldn't come from clients.
1226            MykoMessage::QueryResponse(resp) => {
1227                log::warn!(
1228                    "Unexpected client message kind=query_response client={} tx={} seq={} upserts={} deletes={} active_subscriptions={}",
1229                    session.client_id,
1230                    resp.tx,
1231                    resp.sequence,
1232                    resp.upserts.len(),
1233                    resp.deletes.len(),
1234                    session.subscription_count()
1235                );
1236            }
1237            MykoMessage::QueryError(err) => {
1238                log::warn!(
1239                    "Unexpected client message kind=query_error client={} tx={} query_id={} message={} active_subscriptions={}",
1240                    session.client_id,
1241                    err.tx,
1242                    err.query_id,
1243                    err.message,
1244                    session.subscription_count()
1245                );
1246            }
1247            MykoMessage::ViewResponse(resp) => {
1248                log::warn!(
1249                    "Unexpected client message kind=view_response client={} tx={} seq={} upserts={} deletes={} active_subscriptions={}",
1250                    session.client_id,
1251                    resp.tx,
1252                    resp.sequence,
1253                    resp.upserts.len(),
1254                    resp.deletes.len(),
1255                    session.subscription_count()
1256                );
1257            }
1258            MykoMessage::ViewError(err) => {
1259                log::warn!(
1260                    "Unexpected client message kind=view_error client={} tx={} view_id={} message={} active_subscriptions={}",
1261                    session.client_id,
1262                    err.tx,
1263                    err.view_id,
1264                    err.message,
1265                    session.subscription_count()
1266                );
1267            }
1268            MykoMessage::ReportResponse(resp) => {
1269                log::warn!(
1270                    "Unexpected client message kind=report_response client={} tx={} active_subscriptions={}",
1271                    session.client_id,
1272                    resp.tx,
1273                    session.subscription_count()
1274                );
1275            }
1276            MykoMessage::ReportError(err) => {
1277                log::warn!(
1278                    "Unexpected client message kind=report_error client={} tx={} report_id={} message={} active_subscriptions={}",
1279                    session.client_id,
1280                    err.tx,
1281                    err.report_id,
1282                    err.message,
1283                    session.subscription_count()
1284                );
1285            }
1286            MykoMessage::CommandResponse(resp) => {
1287                if resp.tx.trim().is_empty() {
1288                    log::warn!(
1289                        "Malformed client message kind=command_response client={} tx=<empty> active_subscriptions={}",
1290                        session.client_id,
1291                        session.subscription_count()
1292                    );
1293                } else {
1294                    let correlated = outbound_commands_by_tx
1295                        .lock()
1296                        .ok()
1297                        .and_then(|mut map| map.remove(&resp.tx));
1298                    if let Some((command_id, started)) = correlated {
1299                        log::debug!(
1300                            "Client command response matched outbound command client={} tx={} command_id={} roundtrip_ms={} active_subscriptions={}",
1301                            session.client_id,
1302                            resp.tx,
1303                            command_id,
1304                            started.elapsed().as_millis(),
1305                            session.subscription_count()
1306                        );
1307                    } else {
1308                        log::warn!(
1309                            "Client command response without outbound match client={} tx={} active_subscriptions={}",
1310                            session.client_id,
1311                            resp.tx,
1312                            session.subscription_count()
1313                        );
1314                    }
1315                }
1316            }
1317            MykoMessage::CommandError(err) => {
1318                if err.tx.trim().is_empty() {
1319                    log::warn!(
1320                        "Malformed client message kind=command_error client={} tx=<empty> command_id={} message={} active_subscriptions={}",
1321                        session.client_id,
1322                        err.command_id,
1323                        err.message,
1324                        session.subscription_count()
1325                    );
1326                } else {
1327                    let correlated = outbound_commands_by_tx
1328                        .lock()
1329                        .ok()
1330                        .and_then(|mut map| map.remove(&err.tx));
1331                    if let Some((command_id, started)) = correlated {
1332                        log::warn!(
1333                            "Client command error matched outbound command client={} tx={} command_id={} transport_command_id={} message={} roundtrip_ms={} active_subscriptions={}",
1334                            session.client_id,
1335                            err.tx,
1336                            err.command_id,
1337                            command_id,
1338                            err.message,
1339                            started.elapsed().as_millis(),
1340                            session.subscription_count()
1341                        );
1342                    } else {
1343                        log::warn!(
1344                            "Client command error without outbound match client={} tx={} command_id={} message={} active_subscriptions={}",
1345                            session.client_id,
1346                            err.tx,
1347                            err.command_id,
1348                            err.message,
1349                            session.subscription_count()
1350                        );
1351                    }
1352                }
1353            }
1354            MykoMessage::Benchmark(payload) => {
1355                let stats = ws_benchmark_stats();
1356                ensure_ws_benchmark_logger();
1357                stats.message_count.fetch_add(1, Ordering::Relaxed);
1358                // Estimate payload size from the JSON value
1359                let size = payload.to_string().len() as u64;
1360                stats.total_bytes.fetch_add(size, Ordering::Relaxed);
1361            }
1362            MykoMessage::ProtocolSwitch { protocol } => {
1363                log::warn!(
1364                    "Unexpected client message kind=protocol_switch_ack client={} protocol={} active_subscriptions={}",
1365                    session.client_id,
1366                    protocol,
1367                    session.subscription_count()
1368                );
1369            }
1370        }
1371
1372        Ok(())
1373    }
1374
1375    fn execute_command_job(
1376        ctx: Arc<CellServerCtx>,
1377        priority_tx: &mpsc::Sender<MykoMessage>,
1378        drop_logger: &DropLogger,
1379        client_id: Arc<str>,
1380        job: CommandJob,
1381    ) {
1382        let host_id = ctx.host_id;
1383        let started = Instant::now();
1384        let queue_wait_ms = started.duration_since(job.received_at).as_millis();
1385        let command_id = job.command_id.clone();
1386
1387        let mut handler_found = false;
1388        for registration in inventory::iter::<CommandHandlerRegistration> {
1389            if registration.command_id == command_id {
1390                handler_found = true;
1391                let executor = (registration.factory)();
1392
1393                let req = Arc::new(RequestContext::from_client(
1394                    job.tx_id.clone(),
1395                    client_id.clone(),
1396                    host_id,
1397                ));
1398                let cmd_id: Arc<str> = Arc::from(command_id.clone());
1399                let cmd_ctx = CommandContext::new(cmd_id, req, ctx.clone());
1400                let execute_started = Instant::now();
1401
1402                match executor.execute_from_value(job.command.clone(), cmd_ctx) {
1403                    Ok(result) => {
1404                        let response = MykoMessage::CommandResponse(CommandResponse {
1405                            response: result,
1406                            tx: job.tx_id.to_string(),
1407                        });
1408                        if let Err(e) = priority_tx.try_send(response) {
1409                            drop_logger.on_drop("CommandResponse", &e);
1410                        }
1411                    }
1412                    Err(e) => {
1413                        let error = MykoMessage::CommandError(CommandError {
1414                            tx: job.tx_id.to_string(),
1415                            command_id: command_id.clone(),
1416                            message: e.message,
1417                        });
1418                        if let Err(err) = priority_tx.try_send(error) {
1419                            drop_logger.on_drop("CommandError", &err);
1420                        }
1421                    }
1422                }
1423                let execute_ms = execute_started.elapsed().as_millis();
1424                let total_ms = job.received_at.elapsed().as_millis();
1425                log::trace!(
1426                    target: "myko_server::ws_perf",
1427                    "command_exec client={} tx={} command_id={} queue_wait_ms={} execute_ms={} total_ms={}",
1428                    client_id,
1429                    job.tx_id,
1430                    command_id,
1431                    queue_wait_ms,
1432                    execute_ms,
1433                    total_ms
1434                );
1435                break;
1436            }
1437        }
1438
1439        if !handler_found {
1440            log::warn!("No registered handler for command: {}", command_id);
1441            let error = MykoMessage::CommandError(CommandError {
1442                tx: job.tx_id.to_string(),
1443                command_id: command_id.clone(),
1444                message: format!("Command handler not found: {}", command_id),
1445            });
1446            if let Err(e) = priority_tx.try_send(error) {
1447                drop_logger.on_drop("CommandError", &e);
1448            }
1449        }
1450
1451        if !handler_found {
1452            log::debug!(
1453                target: "myko_server::ws_perf",
1454                "command_exec client={} tx={} command_id={} queue_wait_ms={} execute_ms=0 total_ms={} handler_found=false",
1455                client_id,
1456                job.tx_id,
1457                command_id,
1458                queue_wait_ms,
1459                job.received_at.elapsed().as_millis()
1460            );
1461        }
1462    }
1463}
1464
1465/// Channel-based WebSocket writer.
1466///
1467/// Sends messages through an mpsc channel which are then
1468/// forwarded to the actual WebSocket.
1469struct ChannelWriter {
1470    tx: mpsc::Sender<OutboundMessage>,
1471    deferred_tx: mpsc::Sender<DeferredOutbound>,
1472    drop_logger: Arc<DropLogger>,
1473    use_binary_writer: Arc<AtomicBool>,
1474}
1475
1476impl WsWriter for ChannelWriter {
1477    fn send(&self, msg: MykoMessage) {
1478        // Use try_send since we're in sync context
1479        if let Err(e) = self.tx.try_send(OutboundMessage::Message(msg)) {
1480            self.drop_logger.on_drop("message", &e);
1481        }
1482    }
1483
1484    fn protocol(&self) -> myko::client::MykoProtocol {
1485        if self.use_binary_writer.load(Ordering::SeqCst) {
1486            myko::client::MykoProtocol::MSGPACK
1487        } else {
1488            myko::client::MykoProtocol::JSON
1489        }
1490    }
1491
1492    fn send_serialized_command(
1493        &self,
1494        tx: Arc<str>,
1495        command_id: String,
1496        payload: EncodedCommandMessage,
1497    ) {
1498        if let Err(e) = self.tx.try_send(OutboundMessage::SerializedCommand {
1499            tx,
1500            command_id,
1501            payload,
1502        }) {
1503            self.drop_logger.on_drop("serialized_command", &e);
1504        }
1505    }
1506
1507    fn send_report_response(&self, tx: Arc<str>, output: Arc<dyn AnyOutput>) {
1508        if let Err(e) = self
1509            .deferred_tx
1510            .try_send(DeferredOutbound::Report(tx, output))
1511        {
1512            self.drop_logger.on_drop("ReportResponseDeferred", &e);
1513        }
1514    }
1515
1516    fn send_query_response(&self, response: PendingQueryResponse, is_view: bool) {
1517        if let Err(e) = self
1518            .deferred_tx
1519            .try_send(DeferredOutbound::Query { response, is_view })
1520        {
1521            self.drop_logger.on_drop("QueryResponseDeferred", &e);
1522        }
1523    }
1524}
1525
1526#[cfg(test)]
1527mod tests {
1528    use super::*;
1529
1530    #[test]
1531    fn test_channel_writer() {
1532        let (tx, mut rx) = mpsc::channel(10);
1533        let (deferred_tx, _deferred_rx) = mpsc::channel(10);
1534        let drop_logger = Arc::new(DropLogger::new("test-client".into()));
1535        let writer = ChannelWriter {
1536            tx,
1537            deferred_tx,
1538            drop_logger,
1539            use_binary_writer: Arc::new(AtomicBool::new(false)),
1540        };
1541
1542        let msg = MykoMessage::Ping(PingData {
1543            id: "test".to_string(),
1544            timestamp: 0,
1545        });
1546        writer.send(msg);
1547
1548        let received = rx.try_recv().unwrap();
1549        assert!(matches!(
1550            received,
1551            OutboundMessage::Message(MykoMessage::Ping(_))
1552        ));
1553    }
1554}