Skip to main content

myko_server/
ws_handler.rs

1//! WebSocket handler for the cell-based server.
2//!
3//! Handles WebSocket connections using ClientSession for subscription management.
4
5use std::{
6    collections::HashMap,
7    net::SocketAddr,
8    sync::{
9        Arc, Mutex, OnceLock,
10        atomic::{AtomicBool, AtomicU64, Ordering},
11    },
12    thread,
13    time::{Duration, Instant},
14};
15
16use dashmap::DashMap;
17use futures_util::{SinkExt, StreamExt};
18use hyphae::SelectExt;
19use myko::{
20    WS_MAX_FRAME_SIZE_BYTES, WS_MAX_MESSAGE_SIZE_BYTES,
21    command::{CommandContext, CommandHandlerRegistration},
22    entities::client::{Client, ClientId},
23    relationship::{iter_client_id_registrations, iter_fallback_to_id_registrations},
24    report::AnyOutput,
25    request::RequestContext,
26    server::{
27        CellServerCtx, ClientSession, PendingQueryResponse, WsWriter,
28        client_registry::try_client_registry,
29    },
30    wire::{
31        CancelSubscription, CommandError, CommandResponse, EncodedCommandMessage, MEvent,
32        MEventType, MykoMessage, PingData, QueryWindowUpdate, ViewError, ViewWindowUpdate,
33    },
34};
35use tokio::{net::TcpStream, sync::mpsc, time::interval};
36use tokio_tungstenite::{
37    accept_async_with_config,
38    tungstenite::{Message, protocol::WebSocketConfig},
39};
40use uuid::Uuid;
41
42/// Protocol switch message sent by client to enable binary (msgpack) encoding.
43/// Must match ProtocolMessages.SwitchToMSGPACK in TypeScript client.
44const SWITCH_TO_MSGPACK: &str = "myko:switch-to-msgpack";
45
46struct WsIngestStats {
47    counts_by_type: DashMap<Arc<str>, Arc<AtomicU64>>,
48}
49
50struct WsBenchmarkStats {
51    message_count: AtomicU64,
52    total_bytes: AtomicU64,
53}
54
55static WS_INGEST_STATS: OnceLock<Arc<WsIngestStats>> = OnceLock::new();
56static WS_INGEST_LOGGER_STARTED: AtomicBool = AtomicBool::new(false);
57static WS_BENCHMARK_STATS: OnceLock<Arc<WsBenchmarkStats>> = OnceLock::new();
58static WS_BENCHMARK_LOGGER_STARTED: AtomicBool = AtomicBool::new(false);
59
60fn ws_benchmark_stats() -> Arc<WsBenchmarkStats> {
61    WS_BENCHMARK_STATS
62        .get_or_init(|| {
63            Arc::new(WsBenchmarkStats {
64                message_count: AtomicU64::new(0),
65                total_bytes: AtomicU64::new(0),
66            })
67        })
68        .clone()
69}
70
71fn ensure_ws_benchmark_logger() {
72    if WS_BENCHMARK_LOGGER_STARTED
73        .compare_exchange(false, true, Ordering::Relaxed, Ordering::Relaxed)
74        .is_err()
75    {
76        return;
77    }
78
79    let stats = ws_benchmark_stats();
80    thread::Builder::new()
81        .name("ws-benchmark-logger".into())
82        .spawn(move || {
83            loop {
84                thread::sleep(Duration::from_secs(1));
85
86                let count = stats.message_count.swap(0, Ordering::Relaxed);
87                let bytes = stats.total_bytes.swap(0, Ordering::Relaxed);
88
89                if count == 0 {
90                    continue;
91                }
92
93                log::info!(
94                    "WebSocket benchmark last_1s messages={} bytes={} avg_bytes={}",
95                    count,
96                    bytes,
97                    if count > 0 { bytes / count } else { 0 },
98                );
99            }
100        })
101        .expect("failed to spawn websocket benchmark logger thread");
102}
103
104fn ws_ingest_stats() -> Arc<WsIngestStats> {
105    WS_INGEST_STATS
106        .get_or_init(|| {
107            Arc::new(WsIngestStats {
108                counts_by_type: DashMap::new(),
109            })
110        })
111        .clone()
112}
113
114fn ensure_ws_ingest_logger() {
115    if WS_INGEST_LOGGER_STARTED
116        .compare_exchange(false, true, Ordering::Relaxed, Ordering::Relaxed)
117        .is_err()
118    {
119        return;
120    }
121
122    let stats = ws_ingest_stats();
123    thread::Builder::new()
124        .name("ws-ingest-logger".into())
125        .spawn(move || {
126            loop {
127                thread::sleep(Duration::from_secs(1));
128
129                let mut per_type = Vec::new();
130                let mut total = 0u64;
131                for entry in &stats.counts_by_type {
132                    let count = entry.value().swap(0, Ordering::Relaxed);
133                    if count == 0 {
134                        continue;
135                    }
136                    total += count;
137                    per_type.push((entry.key().clone(), count));
138                }
139
140                if total == 0 {
141                    continue;
142                }
143
144                per_type.sort_by(|a, b| b.1.cmp(&a.1).then_with(|| a.0.cmp(&b.0)));
145                let details = per_type
146                    .into_iter()
147                    .map(|(entity_type, count)| format!("{entity_type}={count}"))
148                    .collect::<Vec<_>>()
149                    .join(" ");
150
151                log::info!("WebSocket ingest last_1s total={} {}", total, details);
152            }
153        })
154        .expect("failed to spawn websocket ingest logger thread");
155}
156
157fn record_ws_ingest(events: &[MEvent]) {
158    if events.is_empty() {
159        return;
160    }
161
162    let stats = ws_ingest_stats();
163    for event in events {
164        let counter = stats
165            .counts_by_type
166            .entry(Arc::<str>::from(event.item_type.as_str()))
167            .or_insert_with(|| Arc::new(AtomicU64::new(0)))
168            .clone();
169        counter.fetch_add(1, Ordering::Relaxed);
170    }
171}
172
173fn normalize_incoming_event(event: &mut MEvent, client_id: &str) {
174    if event.change_type != MEventType::SET {
175        return;
176    }
177
178    // Auto-populate #[myko_client_id] fields with the connection's client_id
179    for reg in iter_client_id_registrations() {
180        if reg.entity_type == event.item_type {
181            if let Some(obj) = event.item.as_object_mut() {
182                obj.insert(
183                    reg.field_name_json.to_string(),
184                    serde_json::Value::String(client_id.to_string()),
185                );
186            }
187            break;
188        }
189    }
190
191    // Auto-populate #[fallback_to_id] fields with the entity's own id
192    // if the field is null or missing.
193    if let Some(obj) = event.item.as_object_mut()
194        && let Some(id) = obj.get("id").and_then(|v| v.as_str()).map(String::from)
195    {
196        for reg in iter_fallback_to_id_registrations() {
197            if reg.entity_type == event.item_type {
198                let field = reg.field_name_json;
199                if matches!(obj.get(field), None | Some(serde_json::Value::Null)) {
200                    obj.insert(field.to_string(), serde_json::Value::String(id.clone()));
201                }
202            }
203        }
204    }
205}
206
207/// Per-connection drop tracking to avoid log storms when clients fall behind.
208///
209/// When the outbound channel is full, we will drop messages (same as today),
210/// but we must not `warn!` for every drop or we can effectively DoS ourselves.
211struct DropLogger {
212    client_id: Arc<str>,
213    dropped: std::sync::atomic::AtomicU64,
214    last_log_ms: std::sync::atomic::AtomicU64,
215}
216
217impl DropLogger {
218    fn new(client_id: Arc<str>) -> Self {
219        Self {
220            client_id,
221            dropped: std::sync::atomic::AtomicU64::new(0),
222            last_log_ms: std::sync::atomic::AtomicU64::new(0),
223        }
224    }
225
226    fn on_drop(&self, kind: &'static str, err: &dyn std::fmt::Display) {
227        use std::sync::atomic::Ordering;
228
229        self.dropped.fetch_add(1, Ordering::Relaxed);
230
231        // Log at most once per second per connection.
232        let now_ms = std::time::SystemTime::now()
233            .duration_since(std::time::UNIX_EPOCH)
234            .unwrap_or_default()
235            .as_millis() as u64;
236        let last_ms = self.last_log_ms.load(Ordering::Relaxed);
237        if now_ms.saturating_sub(last_ms) < 1000 {
238            return;
239        }
240
241        if self
242            .last_log_ms
243            .compare_exchange(last_ms, now_ms, Ordering::Relaxed, Ordering::Relaxed)
244            .is_err()
245        {
246            return;
247        }
248
249        let n = self.dropped.swap(0, Ordering::Relaxed);
250        log::warn!(
251            "WebSocket send buffer full; dropped {} message(s) for client {} (latest: {}): {}",
252            n,
253            self.client_id,
254            kind,
255            err
256        );
257    }
258}
259
260struct CommandJob {
261    tx_id: Arc<str>,
262    command_id: String,
263    command: serde_json::Value,
264    received_at: Instant,
265}
266
267/// Result of an async subscription build (query or view cell_factory).
268enum SubscriptionReady {
269    Query {
270        tx_id: Arc<str>,
271        cellmap: hyphae::CellMap<Arc<str>, Arc<dyn myko::item::AnyItem>, hyphae::CellImmutable>,
272        window: Option<myko::wire::QueryWindow>,
273    },
274    View {
275        tx_id: Arc<str>,
276        view_id: Arc<str>,
277        cellmap: hyphae::CellMap<Arc<str>, Arc<dyn myko::item::AnyItem>, hyphae::CellImmutable>,
278        window: Option<myko::wire::QueryWindow>,
279    },
280}
281
282enum OutboundMessage {
283    Message(MykoMessage),
284    SerializedCommand {
285        tx: Arc<str>,
286        command_id: String,
287        payload: EncodedCommandMessage,
288    },
289}
290
291enum DeferredOutbound {
292    Report(Arc<str>, Arc<dyn AnyOutput>),
293    Query {
294        response: PendingQueryResponse,
295        is_view: bool,
296    },
297}
298
299/// WebSocket handler for a single client connection.
300pub struct WsHandler;
301
302impl WsHandler {
303    /// Handle a new WebSocket connection.
304    #[allow(clippy::too_many_arguments)]
305    pub async fn handle_connection(
306        stream: TcpStream,
307        addr: SocketAddr,
308        ctx: Arc<CellServerCtx>,
309    ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
310        ensure_ws_ingest_logger();
311
312        let host_id = ctx.host_id;
313
314        let mut ws_config = WebSocketConfig::default();
315        ws_config.max_message_size = Some(WS_MAX_MESSAGE_SIZE_BYTES);
316        ws_config.max_frame_size = Some(WS_MAX_FRAME_SIZE_BYTES);
317        let ws_stream = accept_async_with_config(stream, Some(ws_config)).await?;
318        let (mut write, mut read) = ws_stream.split();
319
320        // Create a bounded channel for sending messages to the client
321        // High limit (10k) since we have good memory availability
322        let (tx, mut rx) = mpsc::channel::<OutboundMessage>(10_000);
323        let (deferred_tx, mut deferred_rx) = mpsc::channel::<DeferredOutbound>(10_000);
324        let (priority_tx, mut priority_rx) = mpsc::channel::<MykoMessage>(1_000);
325        let (command_tx, mut command_rx) = mpsc::unbounded_channel::<CommandJob>();
326        let (subscribe_tx, mut subscribe_rx) = mpsc::unbounded_channel::<SubscriptionReady>();
327
328        // Protocol: default to JSON, switch to binary only if client opts in
329        let use_binary = Arc::new(AtomicBool::new(false));
330
331        // Create client session with channel-based writer
332        let client_id: Arc<str> = Uuid::new_v4().to_string().into();
333        let drop_logger = Arc::new(DropLogger::new(client_id.clone()));
334        let writer = ChannelWriter {
335            tx: tx.clone(),
336            deferred_tx: deferred_tx.clone(),
337            drop_logger: drop_logger.clone(),
338            use_binary_writer: use_binary.clone(),
339        };
340
341        // Register writer in the global client registry (if initialized)
342        let writer_arc: Arc<dyn WsWriter> = Arc::new(ChannelWriter {
343            tx: tx.clone(),
344            deferred_tx: deferred_tx.clone(),
345            drop_logger: drop_logger.clone(),
346            use_binary_writer: use_binary.clone(),
347        });
348        if let Some(registry) = try_client_registry() {
349            registry.register(client_id.clone(), writer_arc);
350        }
351
352        let mut session = ClientSession::new(client_id.clone(), writer);
353
354        let use_binary_writer = use_binary.clone();
355        let query_ids_by_tx: Arc<Mutex<HashMap<Arc<str>, Arc<str>>>> =
356            Arc::new(Mutex::new(HashMap::new()));
357        let view_ids_by_tx: Arc<Mutex<HashMap<Arc<str>, Arc<str>>>> =
358            Arc::new(Mutex::new(HashMap::new()));
359        let subscribe_started_by_tx: Arc<Mutex<HashMap<Arc<str>, Instant>>> =
360            Arc::new(Mutex::new(HashMap::new()));
361        let command_started_by_tx: Arc<Mutex<HashMap<Arc<str>, Instant>>> =
362            Arc::new(Mutex::new(HashMap::new()));
363        let outbound_commands_by_tx: Arc<Mutex<HashMap<String, (String, Instant)>>> =
364            Arc::new(Mutex::new(HashMap::new()));
365
366        let outbound_commands_by_tx_writer = outbound_commands_by_tx.clone();
367
368        // Publish Client entity
369        let client_entity = Client {
370            id: ClientId(client_id.clone()),
371            server_id: host_id.to_string().into(),
372            address: Some(Arc::from(addr.to_string())),
373            windback: None,
374        };
375        if let Err(e) = ctx.set(&client_entity) {
376            log::error!("Failed to persist client entity: {e}");
377        }
378
379        log::info!("Client connected: {} from {}", client_id, addr);
380
381        let write_ctx = ctx.clone();
382        let write_client_id = client_id.clone();
383        let write_addr = addr;
384        let command_ctx = ctx.clone();
385        let command_priority_tx = priority_tx.clone();
386        let command_drop_logger = drop_logger.clone();
387        let command_client_id = client_id.clone();
388
389        // Spawn task to forward messages from channel to WebSocket
390        let write_task = tokio::spawn(async move {
391            let _ctx = write_ctx;
392            let mut normal_open = true;
393            let mut priority_open = true;
394            let mut deferred_open = true;
395            while normal_open || priority_open || deferred_open {
396                let msg = tokio::select! {
397                    biased;
398                    maybe = priority_rx.recv(), if priority_open => {
399                        match maybe {
400                            Some(msg) => OutboundMessage::Message(msg),
401                            None => {
402                                priority_open = false;
403                                continue;
404                            }
405                        }
406                    }
407                    maybe = deferred_rx.recv(), if deferred_open => {
408                        match maybe {
409                            Some(DeferredOutbound::Report(tx, output)) => {
410                                OutboundMessage::Message(MykoMessage::ReportResponse(myko::wire::ReportResponse {
411                                    response: output.to_value(),
412                                    tx: tx.to_string(),
413                                }))
414                            }
415                            Some(DeferredOutbound::Query { response, is_view }) => {
416                                if is_view {
417                                    OutboundMessage::Message(MykoMessage::ViewResponse(response.into_wire()))
418                                } else {
419                                    OutboundMessage::Message(MykoMessage::QueryResponse(response.into_wire()))
420                                }
421                            }
422                            None => {
423                                deferred_open = false;
424                                continue;
425                            }
426                        }
427                    }
428                    maybe = rx.recv(), if normal_open => {
429                        match maybe {
430                            Some(msg) => msg,
431                            None => {
432                                normal_open = false;
433                                continue;
434                            }
435                        }
436                    }
437                };
438                let (kind, tx_id, seq, _upserts, _deletes, _total_count) = match &msg {
439                    OutboundMessage::SerializedCommand { tx, .. } => {
440                        ("command", Some(tx.clone()), None, None, None, None)
441                    }
442                    OutboundMessage::Message(msg) => match msg {
443                        MykoMessage::ViewResponse(r) => (
444                            "view_response",
445                            Some(r.tx.clone()),
446                            Some(r.sequence),
447                            Some(r.upserts.len()),
448                            Some(r.deletes.len()),
449                            r.total_count,
450                        ),
451                        MykoMessage::QueryResponse(r) => (
452                            "query_response",
453                            Some(r.tx.clone()),
454                            Some(r.sequence),
455                            Some(r.upserts.len()),
456                            Some(r.deletes.len()),
457                            r.total_count,
458                        ),
459                        MykoMessage::CommandResponse(r) => (
460                            "command_response",
461                            Some(Arc::<str>::from(r.tx.clone())),
462                            None,
463                            None,
464                            None,
465                            None,
466                        ),
467                        MykoMessage::CommandError(r) => (
468                            "command_error",
469                            Some(Arc::<str>::from(r.tx.clone())),
470                            None,
471                            None,
472                            None,
473                            None,
474                        ),
475                        _ => ("other", None, None, None, None, None),
476                    },
477                };
478
479                match &msg {
480                    OutboundMessage::SerializedCommand { tx, command_id, .. } => {
481                        if !tx.trim().is_empty()
482                            && let Ok(mut map) = outbound_commands_by_tx_writer.lock()
483                        {
484                            map.insert(tx.to_string(), (command_id.clone(), Instant::now()));
485                        }
486                    }
487                    OutboundMessage::Message(MykoMessage::Command(wrapped)) => {
488                        if let Some(tx_id) = wrapped.command.get("tx").and_then(|v| v.as_str())
489                            && !tx_id.trim().is_empty()
490                            && let Ok(mut map) = outbound_commands_by_tx_writer.lock()
491                        {
492                            map.insert(
493                                tx_id.to_string(),
494                                (wrapped.command_id.clone(), Instant::now()),
495                            );
496                        }
497                    }
498                    _ => {}
499                }
500
501                let ws_msg = match &msg {
502                    OutboundMessage::SerializedCommand {
503                        payload: EncodedCommandMessage::Json(json),
504                        ..
505                    } => Message::Text(json.clone().into()),
506                    OutboundMessage::SerializedCommand {
507                        payload: EncodedCommandMessage::Msgpack(bytes),
508                        ..
509                    } => Message::Binary(bytes.clone().into()),
510                    OutboundMessage::Message(msg) if use_binary_writer.load(Ordering::SeqCst) => {
511                        match rmp_serde::to_vec(msg) {
512                            Ok(bytes) => Message::Binary(bytes.into()),
513                            Err(e) => {
514                                log::error!("Failed to serialize message to msgpack: {}", e);
515                                continue;
516                            }
517                        }
518                    }
519                    OutboundMessage::Message(msg) => match serde_json::to_string(msg) {
520                        Ok(json) => Message::Text(json.into()),
521                        Err(e) => {
522                            log::error!("Failed to serialize message to JSON: {}", e);
523                            continue;
524                        }
525                    },
526                };
527                let payload_bytes = match &ws_msg {
528                    Message::Binary(b) => b.len(),
529                    Message::Text(t) => t.len(),
530                    _ => 0,
531                };
532
533                if let Err(err) = write.send(ws_msg).await {
534                    log::error!(
535                        "WebSocket write failed for client {} from {} kind={} tx={:?} seq={:?} payload_bytes={} binary={}: {}",
536                        write_client_id,
537                        write_addr,
538                        kind,
539                        tx_id,
540                        seq,
541                        payload_bytes,
542                        use_binary_writer.load(Ordering::SeqCst),
543                        err
544                    );
545                    break;
546                }
547            }
548            log::warn!(
549                "WebSocket writer task exiting for client {} from {} normal_open={} priority_open={} deferred_open={}",
550                write_client_id,
551                write_addr,
552                normal_open,
553                priority_open,
554                deferred_open
555            );
556        });
557
558        // Execute commands on a dedicated worker so ping/cancel traffic is never
559        // blocked by long-running command handlers.
560        let command_started_cleanup = command_started_by_tx.clone();
561        let command_task = tokio::spawn(async move {
562            while let Some(job) = command_rx.recv().await {
563                let command_ctx = command_ctx.clone();
564                let command_priority_tx = command_priority_tx.clone();
565                let command_drop_logger = command_drop_logger.clone();
566                let command_client_id = command_client_id.clone();
567                let tx_id = job.tx_id.clone();
568                let started_map = command_started_cleanup.clone();
569                match tokio::task::spawn_blocking(move || {
570                    Self::execute_command_job(
571                        command_ctx,
572                        &command_priority_tx,
573                        command_drop_logger.as_ref(),
574                        command_client_id,
575                        job,
576                    );
577                })
578                .await
579                {
580                    Ok(()) => {}
581                    Err(e) => {
582                        log::error!("Command worker panicked: {}", e);
583                    }
584                }
585                // NOTE(ts): Clean up timing entry after command completes (success or panic).
586                if let Ok(mut map) = started_map.lock() {
587                    map.remove(&tx_id);
588                }
589            }
590        });
591
592        // Process incoming messages and completed subscription builds concurrently.
593        // NOTE(ts): View/query cell_factory calls are spawned on the blocking thread pool
594        // so they don't block command processing or other messages.
595        let mut outbound_ttl_interval = interval(Duration::from_secs(10));
596        outbound_ttl_interval.tick().await; // NOTE(ts): consume the immediate first tick
597        loop {
598            tokio::select! {
599                // Completed subscription builds — register with session
600                Some(ready) = subscribe_rx.recv() => {
601                    let tx_id = match &ready {
602                        SubscriptionReady::Query { tx_id, .. } => tx_id.clone(),
603                        SubscriptionReady::View { tx_id, .. } => tx_id.clone(),
604                    };
605                    if let Ok(mut map) = subscribe_started_by_tx.lock() {
606                        map.remove(&tx_id);
607                    }
608                    match ready {
609                        SubscriptionReady::Query { tx_id, cellmap, window } => {
610                            session.subscribe_query(tx_id, cellmap, window);
611                        }
612                        SubscriptionReady::View { tx_id, view_id, cellmap, window } => {
613                            session.subscribe_view_with_id(tx_id, view_id, cellmap, window);
614                        }
615                    }
616                }
617                // NOTE(ts): Sweep outbound command entries older than 10s.
618                // Responses normally arrive quickly; stale entries are from
619                // dropped connections or commands that will never get a response.
620                _ = outbound_ttl_interval.tick() => {
621                    if let Ok(mut map) = outbound_commands_by_tx.lock() {
622                        let before = map.len();
623                        map.retain(|_, (_, started)| started.elapsed() < Duration::from_secs(10));
624                        let removed = before - map.len();
625                        if removed > 0 {
626                            log::debug!(
627                                "Outbound command TTL sweep client={}: removed {} stale entries, {} remaining",
628                                session.client_id,
629                                removed,
630                                map.len()
631                            );
632                        }
633                    }
634                }
635                // Incoming WebSocket messages
636                msg = read.next() => {
637                    let Some(msg) = msg else { break };
638                    let ctx = ctx.clone();
639                    let msg = match msg {
640                        Ok(m) => m,
641                        Err(e) => {
642                            log::error!("WebSocket read error from {}: {}", client_id, e);
643                            break;
644                        }
645                    };
646
647                    match msg {
648                        Message::Binary(data) => {
649                            if !use_binary.load(Ordering::SeqCst) {
650                                log::debug!(
651                                    "Client {} switched to binary (msgpack) protocol via auto-detect",
652                                    client_id
653                                );
654                                use_binary.store(true, Ordering::SeqCst);
655                            }
656
657                            match rmp_serde::from_slice::<MykoMessage>(&data) {
658                                Ok(myko_msg) => {
659                                    if let Err(e) = Self::handle_message(
660                                        &mut session,
661                                        ctx,
662                                        &priority_tx,
663                                        &drop_logger,
664                                        &query_ids_by_tx,
665                                        &view_ids_by_tx,
666                                        &subscribe_started_by_tx,
667                                        &command_started_by_tx,
668                                        &outbound_commands_by_tx,
669                                        &command_tx,
670                                        &subscribe_tx,
671                                        myko_msg,
672                                    ) {
673                                        log::error!("Error handling message: {}", e);
674                                    }
675                                    tokio::task::yield_now().await;
676                                }
677                                Err(e) => {
678                                    log::warn!("Failed to parse message from {}: {}", client_id, e);
679                                }
680                            }
681                        }
682                        Message::Text(text) => {
683                            if text == SWITCH_TO_MSGPACK {
684                                log::debug!(
685                                    "Client {} switched to binary (msgpack) protocol via explicit request",
686                                    client_id
687                                );
688                                if let Err(e) = priority_tx.try_send(MykoMessage::ProtocolSwitch {
689                                    protocol: "msgpack".into(),
690                                }) {
691                                    drop_logger.on_drop("ProtocolSwitch", &e);
692                                }
693                                use_binary.store(true, Ordering::SeqCst);
694                                continue;
695                            }
696
697                            match serde_json::from_str::<MykoMessage>(&text) {
698                                Ok(myko_msg) => {
699                                    if let Err(e) = Self::handle_message(
700                                        &mut session,
701                                        ctx,
702                                        &priority_tx,
703                                        &drop_logger,
704                                        &query_ids_by_tx,
705                                        &view_ids_by_tx,
706                                        &subscribe_started_by_tx,
707                                        &command_started_by_tx,
708                                        &outbound_commands_by_tx,
709                                        &command_tx,
710                                        &subscribe_tx,
711                                        myko_msg,
712                                    ) {
713                                        log::error!("Error handling message: {}", e);
714                                    }
715                                    tokio::task::yield_now().await;
716                                }
717                                Err(e) => {
718                                    log::warn!(
719                                        "Failed to parse JSON message from {}: {} | raw: {}",
720                                        client_id,
721                                        e,
722                                        if text.len() > 1000 {
723                                            &text[..1000]
724                                        } else {
725                                            &text
726                                        }
727                                    );
728                                }
729                            }
730                        }
731                        Message::Ping(data) => {
732                            log::trace!("Ping from {}", client_id);
733                            let _ = data;
734                        }
735                        Message::Pong(_) => {
736                            log::trace!("Pong from {}", client_id);
737                        }
738                        Message::Close(frame) => {
739                            log::warn!("Client {} sent close frame: {:?}", client_id, frame);
740                            break;
741                        }
742                        Message::Frame(_) => {}
743                    }
744                }
745            }
746        }
747
748        // Cleanup
749        drop(session); // Drops all subscription guards
750        write_task.abort();
751        command_task.abort();
752
753        // Unregister from client registry
754        if let Some(registry) = try_client_registry() {
755            registry.unregister(&client_id);
756        }
757
758        // Delete Client entity
759        if let Err(e) = ctx.del(&client_entity) {
760            log::error!("Failed to delete client entity: {e}");
761        }
762
763        log::info!("Client disconnected: {} from {}", client_id, addr);
764
765        Ok(())
766    }
767
768    /// Handle a parsed MykoMessage.
769    #[allow(clippy::too_many_arguments)]
770    fn handle_message<W: WsWriter>(
771        session: &mut ClientSession<W>,
772        ctx: Arc<CellServerCtx>,
773        priority_tx: &mpsc::Sender<MykoMessage>,
774        drop_logger: &Arc<DropLogger>,
775        query_ids_by_tx: &Arc<Mutex<HashMap<Arc<str>, Arc<str>>>>,
776        view_ids_by_tx: &Arc<Mutex<HashMap<Arc<str>, Arc<str>>>>,
777        subscribe_started_by_tx: &Arc<Mutex<HashMap<Arc<str>, Instant>>>,
778        command_started_by_tx: &Arc<Mutex<HashMap<Arc<str>, Instant>>>,
779        outbound_commands_by_tx: &Arc<Mutex<HashMap<String, (String, Instant)>>>,
780        command_tx: &mpsc::UnboundedSender<CommandJob>,
781        subscribe_tx: &mpsc::UnboundedSender<SubscriptionReady>,
782        msg: MykoMessage,
783    ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
784        let handler_registry = ctx.handler_registry.clone();
785
786        let registry = ctx.registry.clone();
787
788        let host_id = ctx.host_id;
789
790        match msg {
791            MykoMessage::Query(wrapped) => {
792                // Extract tx from the query JSON
793                let tx_id: Arc<str> = wrapped
794                    .query
795                    .get("tx")
796                    .and_then(|v| v.as_str())
797                    .unwrap_or("unknown")
798                    .into();
799                let query_id = &wrapped.query_id;
800                let entity_type = &wrapped.query_item_type;
801
802                // Defensive dedupe: some clients can replay the same subscribe request.
803                // A duplicate for the same tx would reset server-side sequence/window state.
804                if session.has_subscription(&tx_id) {
805                    log::debug!(
806                        "Ignoring duplicate query subscribe client={} tx={} query_id={} item_type={}",
807                        session.client_id,
808                        tx_id,
809                        query_id,
810                        entity_type
811                    );
812                    return Ok(());
813                }
814                if let Ok(mut map) = query_ids_by_tx.lock() {
815                    map.insert(tx_id.clone(), query_id.clone());
816                }
817                if let Ok(mut map) = subscribe_started_by_tx.lock() {
818                    map.entry(tx_id.clone()).or_insert_with(Instant::now);
819                }
820
821                log::trace!("Query {} for {} (tx: {})", query_id, entity_type, tx_id);
822                log::trace!(
823                    "Query subscribe request client={} tx={} query_id={} item_type={} window={} active_subscriptions_before={}",
824                    session.client_id,
825                    tx_id,
826                    query_id,
827                    entity_type,
828                    wrapped.window.is_some(),
829                    session.subscription_count()
830                );
831
832                let request_context = Arc::new(RequestContext::from_client(
833                    tx_id.clone(),
834                    session.client_id.clone(),
835                    host_id,
836                ));
837
838                if let Some(query_data) = handler_registry.get_query(query_id) {
839                    let parsed = (query_data.parse)(wrapped.query.clone());
840                    match parsed {
841                        Ok(any_query) => {
842                            // NOTE(ts): Spawn cell_factory on blocking pool so it doesn't
843                            // block command processing or other messages.
844                            let cell_factory = query_data.cell_factory;
845                            let registry = registry.clone();
846                            let request_context = request_context.clone();
847                            let ctx = ctx.clone();
848                            let window = wrapped.window.clone();
849                            let query_id = query_id.clone();
850                            let sub_tx = subscribe_tx.clone();
851                            tokio::task::spawn_blocking(move || {
852                                match cell_factory(any_query, registry, request_context, Some(ctx))
853                                {
854                                    Ok(filtered_cellmap) => {
855                                        let _ = sub_tx.send(SubscriptionReady::Query {
856                                            tx_id,
857                                            cellmap: filtered_cellmap,
858                                            window,
859                                        });
860                                    }
861                                    Err(e) => {
862                                        log::error!(
863                                            "Failed to create query cell for {}: {}",
864                                            query_id,
865                                            e
866                                        );
867                                    }
868                                }
869                            });
870                        }
871                        Err(e) => {
872                            log::error!(
873                                "Failed to parse query {}: {} | payload: {}",
874                                query_id,
875                                e,
876                                serde_json::to_string(&wrapped.query).unwrap_or_default()
877                            );
878                        }
879                    }
880                } else {
881                    // Fall back to select all for unknown queries
882                    log::warn!(
883                        "No registered query handler for {}, falling back to select all",
884                        query_id
885                    );
886                    let cellmap = registry.get_or_create(entity_type).select(|_| true);
887                    session.subscribe_query(tx_id, cellmap, wrapped.window.clone());
888                }
889            }
890
891            MykoMessage::View(wrapped) => {
892                let tx_id: Arc<str> = wrapped
893                    .view
894                    .get("tx")
895                    .and_then(|v| v.as_str())
896                    .unwrap_or("unknown")
897                    .into();
898                let view_id = &wrapped.view_id;
899                let item_type = &wrapped.view_item_type;
900
901                // Defensive dedupe: ignore repeated subscribe for an already-active tx.
902                if session.has_subscription(&tx_id) {
903                    log::debug!(
904                        "Ignoring duplicate view subscribe client={} tx={} view_id={} item_type={}",
905                        session.client_id,
906                        tx_id,
907                        view_id,
908                        item_type
909                    );
910                    return Ok(());
911                }
912                if let Ok(mut map) = view_ids_by_tx.lock() {
913                    map.insert(tx_id.clone(), view_id.clone());
914                }
915                if let Ok(mut map) = subscribe_started_by_tx.lock() {
916                    map.entry(tx_id.clone()).or_insert_with(Instant::now);
917                }
918
919                log::trace!("View {} for {} (tx: {})", view_id, item_type, tx_id);
920                log::trace!(
921                    "View subscribe request client={} tx={} view_id={} item_type={} window={:?}",
922                    session.client_id,
923                    tx_id,
924                    view_id,
925                    item_type,
926                    wrapped.window
927                );
928
929                let request_context = Arc::new(RequestContext::from_client(
930                    tx_id.clone(),
931                    session.client_id.clone(),
932                    host_id,
933                ));
934
935                if let Some(view_data) = handler_registry.get_view(view_id) {
936                    let parsed = (view_data.parse)(wrapped.view.clone());
937                    match parsed {
938                        Ok(any_view) => {
939                            log::trace!(
940                                "View parsed successfully client={} tx={} view_id={}",
941                                session.client_id,
942                                tx_id,
943                                view_id
944                            );
945                            // NOTE(ts): Spawn cell_factory on blocking pool so it doesn't
946                            // block command processing or other messages.
947                            let cell_factory = view_data.cell_factory;
948                            let registry = registry.clone();
949                            let ctx = ctx.clone();
950                            let window = wrapped.window.clone();
951                            let view_id_clone = view_id.clone();
952                            let sub_tx = subscribe_tx.clone();
953                            let priority_tx = priority_tx.clone();
954                            let drop_logger = drop_logger.clone();
955                            tokio::task::spawn_blocking(move || {
956                                match cell_factory(any_view, registry, request_context, ctx) {
957                                    Ok(filtered_cellmap) => {
958                                        log::trace!(
959                                            "View cell factory succeeded tx={} view_id={}",
960                                            tx_id,
961                                            view_id_clone
962                                        );
963                                        let _ = sub_tx.send(SubscriptionReady::View {
964                                            tx_id,
965                                            view_id: view_id_clone,
966                                            cellmap: filtered_cellmap,
967                                            window,
968                                        });
969                                    }
970                                    Err(e) => {
971                                        log::error!(
972                                            "Failed to create view cell for {}: {}",
973                                            view_id_clone,
974                                            e
975                                        );
976                                        if let Err(err) = priority_tx.try_send(
977                                            MykoMessage::ViewError(ViewError {
978                                                tx: tx_id.to_string(),
979                                                view_id: view_id_clone.to_string(),
980                                                message: e,
981                                            }),
982                                        ) {
983                                            drop_logger.on_drop("ViewError", &err);
984                                        }
985                                    }
986                                }
987                            });
988                        }
989                        Err(e) => {
990                            let message = format!("Failed to parse view {}: {}", view_id, e);
991                            log::error!(
992                                "{} | payload: {}",
993                                message,
994                                serde_json::to_string(&wrapped.view).unwrap_or_default()
995                            );
996                            if let Err(err) =
997                                priority_tx.try_send(MykoMessage::ViewError(ViewError {
998                                    tx: tx_id.to_string(),
999                                    view_id: view_id.to_string(),
1000                                    message,
1001                                }))
1002                            {
1003                                drop_logger.on_drop("ViewError", &err);
1004                            }
1005                        }
1006                    }
1007                } else {
1008                    let message = format!("No registered handler for view: {}", view_id);
1009                    log::warn!("{}", message);
1010                    if let Err(err) = priority_tx.try_send(MykoMessage::ViewError(ViewError {
1011                        tx: tx_id.to_string(),
1012                        view_id: view_id.to_string(),
1013                        message,
1014                    })) {
1015                        drop_logger.on_drop("ViewError", &err);
1016                    }
1017                }
1018            }
1019
1020            MykoMessage::QueryCancel(CancelSubscription { tx: tx_id }) => {
1021                log::trace!(
1022                    "QueryCancel received: client={} tx={}",
1023                    session.client_id,
1024                    tx_id
1025                );
1026                let tx_id: Arc<str> = tx_id.into();
1027                if let Ok(mut map) = query_ids_by_tx.lock() {
1028                    map.remove(&tx_id);
1029                }
1030                if let Ok(mut map) = subscribe_started_by_tx.lock() {
1031                    map.remove(&tx_id);
1032                }
1033                session.cancel(&tx_id);
1034            }
1035
1036            MykoMessage::QueryWindow(QueryWindowUpdate { tx, window }) => {
1037                let tx_id: Arc<str> = tx.into();
1038                log::trace!(
1039                    "Query window request client={} tx={} has_window={} active_subscriptions={}",
1040                    session.client_id,
1041                    tx_id,
1042                    window.is_some(),
1043                    session.subscription_count()
1044                );
1045                session.update_query_window(&tx_id, window);
1046            }
1047            MykoMessage::ViewCancel(CancelSubscription { tx: tx_id }) => {
1048                log::trace!("View cancel: {}", tx_id);
1049                let tx_id: Arc<str> = tx_id.into();
1050                if let Ok(mut map) = view_ids_by_tx.lock() {
1051                    map.remove(&tx_id);
1052                }
1053                if let Ok(mut map) = subscribe_started_by_tx.lock() {
1054                    map.remove(&tx_id);
1055                }
1056                session.cancel(&tx_id);
1057            }
1058            MykoMessage::ViewWindow(ViewWindowUpdate { tx, window }) => {
1059                let tx_id: Arc<str> = tx.into();
1060                log::trace!("View window update: {}", tx_id);
1061                session.update_view_window(&tx_id, window);
1062            }
1063
1064            MykoMessage::Report(wrapped) => {
1065                // Extract tx from the report JSON
1066                let tx_id: Arc<str> = wrapped
1067                    .report
1068                    .get("tx")
1069                    .and_then(|v| v.as_str())
1070                    .unwrap_or("unknown")
1071                    .into();
1072                let report_id = &wrapped.report_id;
1073
1074                log::trace!(
1075                    "Report subscribe request client={} tx={} report_id={} active_subscriptions_before={}",
1076                    session.client_id,
1077                    tx_id,
1078                    report_id,
1079                    session.subscription_count()
1080                );
1081
1082                // Look up the report registration
1083                if let Some(report_data) = handler_registry.get_report(report_id) {
1084                    // Parse the report JSON to the concrete type
1085                    let parsed = (report_data.parse)(wrapped.report.clone());
1086                    match parsed {
1087                        Ok(any_report) => {
1088                            let request_context = Arc::new(RequestContext::from_client(
1089                                tx_id.clone(),
1090                                session.client_id.clone(),
1091                                host_id,
1092                            ));
1093
1094                            // Create the cell using the factory (with host_id for context)
1095                            match (report_data.cell_factory)(any_report, request_context, ctx) {
1096                                Ok(cell) => {
1097                                    session.subscribe_report(
1098                                        tx_id,
1099                                        report_id.as_str().into(),
1100                                        cell,
1101                                    );
1102                                }
1103                                Err(e) => {
1104                                    log::error!(
1105                                        "Failed to create report cell for {}: {}",
1106                                        report_id,
1107                                        e
1108                                    );
1109                                }
1110                            }
1111                        }
1112                        Err(e) => {
1113                            log::error!(
1114                                "Failed to parse report {}: {} | payload: {}",
1115                                report_id,
1116                                e,
1117                                serde_json::to_string(&wrapped.report).unwrap_or_default()
1118                            );
1119                        }
1120                    }
1121                } else {
1122                    log::warn!("No registered handler for report: {}", report_id);
1123                }
1124            }
1125
1126            MykoMessage::ReportCancel(CancelSubscription { tx: tx_id }) => {
1127                log::trace!(
1128                    "ReportCancel received: client={} tx={} active_subscriptions_before={}",
1129                    session.client_id,
1130                    tx_id,
1131                    session.subscription_count()
1132                );
1133                session.cancel(&tx_id.into());
1134            }
1135
1136            MykoMessage::Event(mut event) => {
1137                record_ws_ingest(std::slice::from_ref(&event));
1138                event.sanitize_null_bytes();
1139                normalize_incoming_event(&mut event, &session.client_id);
1140                if let Err(e) = ctx.apply_event(event) {
1141                    log::error!(
1142                        "Failed to apply event from client {}: {e}",
1143                        session.client_id
1144                    );
1145                }
1146            }
1147
1148            MykoMessage::EventBatch(mut events) => {
1149                record_ws_ingest(&events);
1150                let incoming = events.len();
1151                if incoming >= 64 {
1152                    log::trace!(
1153                        "Received event batch from client {} size={}",
1154                        session.client_id,
1155                        incoming
1156                    );
1157                }
1158                for event in &mut events {
1159                    event.sanitize_null_bytes();
1160                    normalize_incoming_event(event, &session.client_id);
1161                }
1162                match ctx.apply_event_batch(events) {
1163                    Ok(applied) => {
1164                        log::trace!(
1165                            "Applied event batch from client {} size={}",
1166                            session.client_id,
1167                            applied
1168                        );
1169                    }
1170                    Err(e) => {
1171                        log::error!(
1172                            "Failed to apply event batch from client {}: {}",
1173                            session.client_id,
1174                            e
1175                        );
1176                    }
1177                }
1178            }
1179
1180            MykoMessage::Command(wrapped) => {
1181                // Extract tx from the command JSON
1182                let tx_id: Arc<str> = wrapped
1183                    .command
1184                    .get("tx")
1185                    .and_then(|v| v.as_str())
1186                    .unwrap_or("unknown")
1187                    .into();
1188
1189                let command_id = &wrapped.command_id;
1190
1191                log::trace!("Command {} (tx: {})", command_id, tx_id,);
1192                let received_at = Instant::now();
1193                if let Ok(mut map) = command_started_by_tx.lock() {
1194                    map.insert(tx_id.clone(), received_at);
1195                }
1196                if let Err(e) = command_tx.send(CommandJob {
1197                    tx_id: tx_id.clone(),
1198                    command_id: wrapped.command_id.clone(),
1199                    command: wrapped.command.clone(),
1200                    received_at,
1201                }) {
1202                    log::error!(
1203                        "Failed to enqueue command {} for client {} tx={}: {}",
1204                        command_id,
1205                        session.client_id,
1206                        tx_id,
1207                        e
1208                    );
1209                    let error = MykoMessage::CommandError(CommandError {
1210                        tx: tx_id.to_string(),
1211                        command_id: command_id.to_string(),
1212                        message: "Command queue unavailable".to_string(),
1213                    });
1214                    if let Err(err) = priority_tx.try_send(error) {
1215                        drop_logger.on_drop("CommandError", &err);
1216                    }
1217                }
1218            }
1219
1220            MykoMessage::Ping(PingData { id, timestamp }) => {
1221                // Echo back the ping data
1222                let pong = MykoMessage::Ping(PingData { id, timestamp });
1223                if let Err(e) = priority_tx.try_send(pong) {
1224                    drop_logger.on_drop("Ping", &e);
1225                }
1226            }
1227
1228            // Response messages - these shouldn't come from clients.
1229            MykoMessage::QueryResponse(resp) => {
1230                log::warn!(
1231                    "Unexpected client message kind=query_response client={} tx={} seq={} upserts={} deletes={} active_subscriptions={}",
1232                    session.client_id,
1233                    resp.tx,
1234                    resp.sequence,
1235                    resp.upserts.len(),
1236                    resp.deletes.len(),
1237                    session.subscription_count()
1238                );
1239            }
1240            MykoMessage::QueryError(err) => {
1241                log::warn!(
1242                    "Unexpected client message kind=query_error client={} tx={} query_id={} message={} active_subscriptions={}",
1243                    session.client_id,
1244                    err.tx,
1245                    err.query_id,
1246                    err.message,
1247                    session.subscription_count()
1248                );
1249            }
1250            MykoMessage::ViewResponse(resp) => {
1251                log::warn!(
1252                    "Unexpected client message kind=view_response client={} tx={} seq={} upserts={} deletes={} active_subscriptions={}",
1253                    session.client_id,
1254                    resp.tx,
1255                    resp.sequence,
1256                    resp.upserts.len(),
1257                    resp.deletes.len(),
1258                    session.subscription_count()
1259                );
1260            }
1261            MykoMessage::ViewError(err) => {
1262                log::warn!(
1263                    "Unexpected client message kind=view_error client={} tx={} view_id={} message={} active_subscriptions={}",
1264                    session.client_id,
1265                    err.tx,
1266                    err.view_id,
1267                    err.message,
1268                    session.subscription_count()
1269                );
1270            }
1271            MykoMessage::ReportResponse(resp) => {
1272                log::warn!(
1273                    "Unexpected client message kind=report_response client={} tx={} active_subscriptions={}",
1274                    session.client_id,
1275                    resp.tx,
1276                    session.subscription_count()
1277                );
1278            }
1279            MykoMessage::ReportError(err) => {
1280                log::warn!(
1281                    "Unexpected client message kind=report_error client={} tx={} report_id={} message={} active_subscriptions={}",
1282                    session.client_id,
1283                    err.tx,
1284                    err.report_id,
1285                    err.message,
1286                    session.subscription_count()
1287                );
1288            }
1289            MykoMessage::CommandResponse(resp) => {
1290                if resp.tx.trim().is_empty() {
1291                    log::warn!(
1292                        "Malformed client message kind=command_response client={} tx=<empty> active_subscriptions={}",
1293                        session.client_id,
1294                        session.subscription_count()
1295                    );
1296                } else {
1297                    let correlated = outbound_commands_by_tx
1298                        .lock()
1299                        .ok()
1300                        .and_then(|mut map| map.remove(&resp.tx));
1301                    if let Some((command_id, started)) = correlated {
1302                        log::debug!(
1303                            "Client command response matched outbound command client={} tx={} command_id={} roundtrip_ms={} active_subscriptions={}",
1304                            session.client_id,
1305                            resp.tx,
1306                            command_id,
1307                            started.elapsed().as_millis(),
1308                            session.subscription_count()
1309                        );
1310                    } else {
1311                        log::warn!(
1312                            "Client command response without outbound match client={} tx={} active_subscriptions={}",
1313                            session.client_id,
1314                            resp.tx,
1315                            session.subscription_count()
1316                        );
1317                    }
1318                }
1319            }
1320            MykoMessage::CommandError(err) => {
1321                if err.tx.trim().is_empty() {
1322                    log::warn!(
1323                        "Malformed client message kind=command_error client={} tx=<empty> command_id={} message={} active_subscriptions={}",
1324                        session.client_id,
1325                        err.command_id,
1326                        err.message,
1327                        session.subscription_count()
1328                    );
1329                } else {
1330                    let correlated = outbound_commands_by_tx
1331                        .lock()
1332                        .ok()
1333                        .and_then(|mut map| map.remove(&err.tx));
1334                    if let Some((command_id, started)) = correlated {
1335                        log::warn!(
1336                            "Client command error matched outbound command client={} tx={} command_id={} transport_command_id={} message={} roundtrip_ms={} active_subscriptions={}",
1337                            session.client_id,
1338                            err.tx,
1339                            err.command_id,
1340                            command_id,
1341                            err.message,
1342                            started.elapsed().as_millis(),
1343                            session.subscription_count()
1344                        );
1345                    } else {
1346                        log::warn!(
1347                            "Client command error without outbound match client={} tx={} command_id={} message={} active_subscriptions={}",
1348                            session.client_id,
1349                            err.tx,
1350                            err.command_id,
1351                            err.message,
1352                            session.subscription_count()
1353                        );
1354                    }
1355                }
1356            }
1357            MykoMessage::Benchmark(payload) => {
1358                let stats = ws_benchmark_stats();
1359                ensure_ws_benchmark_logger();
1360                stats.message_count.fetch_add(1, Ordering::Relaxed);
1361                // Estimate payload size from the JSON value
1362                let size = payload.to_string().len() as u64;
1363                stats.total_bytes.fetch_add(size, Ordering::Relaxed);
1364            }
1365            MykoMessage::ProtocolSwitch { protocol } => {
1366                log::warn!(
1367                    "Unexpected client message kind=protocol_switch_ack client={} protocol={} active_subscriptions={}",
1368                    session.client_id,
1369                    protocol,
1370                    session.subscription_count()
1371                );
1372            }
1373        }
1374
1375        Ok(())
1376    }
1377
1378    fn execute_command_job(
1379        ctx: Arc<CellServerCtx>,
1380        priority_tx: &mpsc::Sender<MykoMessage>,
1381        drop_logger: &DropLogger,
1382        client_id: Arc<str>,
1383        job: CommandJob,
1384    ) {
1385        let host_id = ctx.host_id;
1386        let started = Instant::now();
1387        let queue_wait_ms = started.duration_since(job.received_at).as_millis();
1388        let command_id = job.command_id.clone();
1389
1390        let mut handler_found = false;
1391        for registration in inventory::iter::<CommandHandlerRegistration> {
1392            if registration.command_id == command_id {
1393                handler_found = true;
1394                let executor = (registration.factory)();
1395
1396                let req = Arc::new(RequestContext::from_client(
1397                    job.tx_id.clone(),
1398                    client_id.clone(),
1399                    host_id,
1400                ));
1401                let cmd_id: Arc<str> = Arc::from(command_id.clone());
1402                let cmd_ctx = CommandContext::new(cmd_id, req, ctx.clone());
1403                let execute_started = Instant::now();
1404
1405                match executor.execute_from_value(job.command.clone(), cmd_ctx) {
1406                    Ok(result) => {
1407                        let response = MykoMessage::CommandResponse(CommandResponse {
1408                            response: result,
1409                            tx: job.tx_id.to_string(),
1410                        });
1411                        if let Err(e) = priority_tx.try_send(response) {
1412                            drop_logger.on_drop("CommandResponse", &e);
1413                        }
1414                    }
1415                    Err(e) => {
1416                        let error = MykoMessage::CommandError(CommandError {
1417                            tx: job.tx_id.to_string(),
1418                            command_id: command_id.clone(),
1419                            message: e.message,
1420                        });
1421                        if let Err(err) = priority_tx.try_send(error) {
1422                            drop_logger.on_drop("CommandError", &err);
1423                        }
1424                    }
1425                }
1426                let execute_ms = execute_started.elapsed().as_millis();
1427                let total_ms = job.received_at.elapsed().as_millis();
1428                log::trace!(
1429                    target: "myko_server::ws_perf",
1430                    "command_exec client={} tx={} command_id={} queue_wait_ms={} execute_ms={} total_ms={}",
1431                    client_id,
1432                    job.tx_id,
1433                    command_id,
1434                    queue_wait_ms,
1435                    execute_ms,
1436                    total_ms
1437                );
1438                break;
1439            }
1440        }
1441
1442        if !handler_found {
1443            log::warn!("No registered handler for command: {}", command_id);
1444            let error = MykoMessage::CommandError(CommandError {
1445                tx: job.tx_id.to_string(),
1446                command_id: command_id.clone(),
1447                message: format!("Command handler not found: {}", command_id),
1448            });
1449            if let Err(e) = priority_tx.try_send(error) {
1450                drop_logger.on_drop("CommandError", &e);
1451            }
1452        }
1453
1454        if !handler_found {
1455            log::debug!(
1456                target: "myko_server::ws_perf",
1457                "command_exec client={} tx={} command_id={} queue_wait_ms={} execute_ms=0 total_ms={} handler_found=false",
1458                client_id,
1459                job.tx_id,
1460                command_id,
1461                queue_wait_ms,
1462                job.received_at.elapsed().as_millis()
1463            );
1464        }
1465    }
1466}
1467
1468/// Channel-based WebSocket writer.
1469///
1470/// Sends messages through an mpsc channel which are then
1471/// forwarded to the actual WebSocket.
1472struct ChannelWriter {
1473    tx: mpsc::Sender<OutboundMessage>,
1474    deferred_tx: mpsc::Sender<DeferredOutbound>,
1475    drop_logger: Arc<DropLogger>,
1476    use_binary_writer: Arc<AtomicBool>,
1477}
1478
1479impl WsWriter for ChannelWriter {
1480    fn send(&self, msg: MykoMessage) {
1481        // Use try_send since we're in sync context
1482        if let Err(e) = self.tx.try_send(OutboundMessage::Message(msg)) {
1483            self.drop_logger.on_drop("message", &e);
1484        }
1485    }
1486
1487    fn protocol(&self) -> myko::client::MykoProtocol {
1488        if self.use_binary_writer.load(Ordering::SeqCst) {
1489            myko::client::MykoProtocol::MSGPACK
1490        } else {
1491            myko::client::MykoProtocol::JSON
1492        }
1493    }
1494
1495    fn send_serialized_command(
1496        &self,
1497        tx: Arc<str>,
1498        command_id: String,
1499        payload: EncodedCommandMessage,
1500    ) {
1501        if let Err(e) = self.tx.try_send(OutboundMessage::SerializedCommand {
1502            tx,
1503            command_id,
1504            payload,
1505        }) {
1506            self.drop_logger.on_drop("serialized_command", &e);
1507        }
1508    }
1509
1510    fn send_report_response(&self, tx: Arc<str>, output: Arc<dyn AnyOutput>) {
1511        if let Err(e) = self
1512            .deferred_tx
1513            .try_send(DeferredOutbound::Report(tx, output))
1514        {
1515            self.drop_logger.on_drop("ReportResponseDeferred", &e);
1516        }
1517    }
1518
1519    fn send_query_response(&self, response: PendingQueryResponse, is_view: bool) {
1520        if let Err(e) = self
1521            .deferred_tx
1522            .try_send(DeferredOutbound::Query { response, is_view })
1523        {
1524            self.drop_logger.on_drop("QueryResponseDeferred", &e);
1525        }
1526    }
1527}
1528
1529#[cfg(test)]
1530mod tests {
1531    use super::*;
1532
1533    #[test]
1534    fn test_channel_writer() {
1535        let (tx, mut rx) = mpsc::channel(10);
1536        let (deferred_tx, _deferred_rx) = mpsc::channel(10);
1537        let drop_logger = Arc::new(DropLogger::new("test-client".into()));
1538        let writer = ChannelWriter {
1539            tx,
1540            deferred_tx,
1541            drop_logger,
1542            use_binary_writer: Arc::new(AtomicBool::new(false)),
1543        };
1544
1545        let msg = MykoMessage::Ping(PingData {
1546            id: "test".to_string(),
1547            timestamp: 0,
1548        });
1549        writer.send(msg);
1550
1551        let received = rx.try_recv().unwrap();
1552        assert!(matches!(
1553            received,
1554            OutboundMessage::Message(MykoMessage::Ping(_))
1555        ));
1556    }
1557}