1use std::{
6 collections::HashMap,
7 net::SocketAddr,
8 sync::{
9 Arc, Mutex, OnceLock,
10 atomic::{AtomicBool, AtomicU64, Ordering},
11 },
12 thread,
13 time::{Duration, Instant},
14};
15
16use dashmap::DashMap;
17use futures_util::{SinkExt, StreamExt};
18use hyphae::SelectExt;
19use myko::{
20 WS_MAX_FRAME_SIZE_BYTES, WS_MAX_MESSAGE_SIZE_BYTES,
21 command::{CommandContext, CommandHandlerRegistration},
22 entities::client::{Client, ClientId},
23 relationship::{iter_client_id_registrations, iter_fallback_to_id_registrations},
24 report::AnyOutput,
25 request::RequestContext,
26 server::{
27 CellServerCtx, ClientSession, PendingQueryResponse, WsWriter,
28 client_registry::try_client_registry,
29 },
30 wire::{
31 CancelSubscription, CommandError, CommandResponse, EncodedCommandMessage, MEvent,
32 MEventType, MykoMessage, PingData, QueryWindowUpdate, ViewError, ViewWindowUpdate,
33 },
34};
35use tokio::{net::TcpStream, sync::mpsc, time::interval};
36use tokio_tungstenite::{
37 accept_async_with_config,
38 tungstenite::{Message, protocol::WebSocketConfig},
39};
40use uuid::Uuid;
41
42const SWITCH_TO_MSGPACK: &str = "myko:switch-to-msgpack";
45
46struct WsIngestStats {
47 counts_by_type: DashMap<Arc<str>, Arc<AtomicU64>>,
48}
49
50struct WsBenchmarkStats {
51 message_count: AtomicU64,
52 total_bytes: AtomicU64,
53}
54
55static WS_INGEST_STATS: OnceLock<Arc<WsIngestStats>> = OnceLock::new();
56static WS_INGEST_LOGGER_STARTED: AtomicBool = AtomicBool::new(false);
57static WS_BENCHMARK_STATS: OnceLock<Arc<WsBenchmarkStats>> = OnceLock::new();
58static WS_BENCHMARK_LOGGER_STARTED: AtomicBool = AtomicBool::new(false);
59
60fn ws_benchmark_stats() -> Arc<WsBenchmarkStats> {
61 WS_BENCHMARK_STATS
62 .get_or_init(|| {
63 Arc::new(WsBenchmarkStats {
64 message_count: AtomicU64::new(0),
65 total_bytes: AtomicU64::new(0),
66 })
67 })
68 .clone()
69}
70
71fn ensure_ws_benchmark_logger() {
72 if WS_BENCHMARK_LOGGER_STARTED
73 .compare_exchange(false, true, Ordering::Relaxed, Ordering::Relaxed)
74 .is_err()
75 {
76 return;
77 }
78
79 let stats = ws_benchmark_stats();
80 thread::Builder::new()
81 .name("ws-benchmark-logger".into())
82 .spawn(move || {
83 loop {
84 thread::sleep(Duration::from_secs(1));
85
86 let count = stats.message_count.swap(0, Ordering::Relaxed);
87 let bytes = stats.total_bytes.swap(0, Ordering::Relaxed);
88
89 if count == 0 {
90 continue;
91 }
92
93 log::info!(
94 "WebSocket benchmark last_1s messages={} bytes={} avg_bytes={}",
95 count,
96 bytes,
97 if count > 0 { bytes / count } else { 0 },
98 );
99 }
100 })
101 .expect("failed to spawn websocket benchmark logger thread");
102}
103
104fn ws_ingest_stats() -> Arc<WsIngestStats> {
105 WS_INGEST_STATS
106 .get_or_init(|| {
107 Arc::new(WsIngestStats {
108 counts_by_type: DashMap::new(),
109 })
110 })
111 .clone()
112}
113
114fn ensure_ws_ingest_logger() {
115 if WS_INGEST_LOGGER_STARTED
116 .compare_exchange(false, true, Ordering::Relaxed, Ordering::Relaxed)
117 .is_err()
118 {
119 return;
120 }
121
122 let stats = ws_ingest_stats();
123 thread::Builder::new()
124 .name("ws-ingest-logger".into())
125 .spawn(move || {
126 loop {
127 thread::sleep(Duration::from_secs(1));
128
129 let mut per_type = Vec::new();
130 let mut total = 0u64;
131 for entry in &stats.counts_by_type {
132 let count = entry.value().swap(0, Ordering::Relaxed);
133 if count == 0 {
134 continue;
135 }
136 total += count;
137 per_type.push((entry.key().clone(), count));
138 }
139
140 if total == 0 {
141 continue;
142 }
143
144 per_type.sort_by(|a, b| b.1.cmp(&a.1).then_with(|| a.0.cmp(&b.0)));
145 let details = per_type
146 .into_iter()
147 .map(|(entity_type, count)| format!("{entity_type}={count}"))
148 .collect::<Vec<_>>()
149 .join(" ");
150
151 log::info!("WebSocket ingest last_1s total={} {}", total, details);
152 }
153 })
154 .expect("failed to spawn websocket ingest logger thread");
155}
156
157fn record_ws_ingest(events: &[MEvent]) {
158 if events.is_empty() {
159 return;
160 }
161
162 let stats = ws_ingest_stats();
163 for event in events {
164 let counter = stats
165 .counts_by_type
166 .entry(Arc::<str>::from(event.item_type.as_str()))
167 .or_insert_with(|| Arc::new(AtomicU64::new(0)))
168 .clone();
169 counter.fetch_add(1, Ordering::Relaxed);
170 }
171}
172
173fn normalize_incoming_event(event: &mut MEvent, client_id: &str) {
174 if event.change_type != MEventType::SET {
175 return;
176 }
177
178 for reg in iter_client_id_registrations() {
180 if reg.entity_type == event.item_type {
181 if let Some(obj) = event.item.as_object_mut() {
182 obj.insert(
183 reg.field_name_json.to_string(),
184 serde_json::Value::String(client_id.to_string()),
185 );
186 }
187 break;
188 }
189 }
190
191 if let Some(obj) = event.item.as_object_mut()
194 && let Some(id) = obj.get("id").and_then(|v| v.as_str()).map(String::from)
195 {
196 for reg in iter_fallback_to_id_registrations() {
197 if reg.entity_type == event.item_type {
198 let field = reg.field_name_json;
199 if matches!(obj.get(field), None | Some(serde_json::Value::Null)) {
200 obj.insert(field.to_string(), serde_json::Value::String(id.clone()));
201 }
202 }
203 }
204 }
205}
206
207struct DropLogger {
212 client_id: Arc<str>,
213 dropped: std::sync::atomic::AtomicU64,
214 last_log_ms: std::sync::atomic::AtomicU64,
215}
216
217impl DropLogger {
218 fn new(client_id: Arc<str>) -> Self {
219 Self {
220 client_id,
221 dropped: std::sync::atomic::AtomicU64::new(0),
222 last_log_ms: std::sync::atomic::AtomicU64::new(0),
223 }
224 }
225
226 fn on_drop(&self, kind: &'static str, err: &dyn std::fmt::Display) {
227 use std::sync::atomic::Ordering;
228
229 self.dropped.fetch_add(1, Ordering::Relaxed);
230
231 let now_ms = std::time::SystemTime::now()
233 .duration_since(std::time::UNIX_EPOCH)
234 .unwrap_or_default()
235 .as_millis() as u64;
236 let last_ms = self.last_log_ms.load(Ordering::Relaxed);
237 if now_ms.saturating_sub(last_ms) < 1000 {
238 return;
239 }
240
241 if self
242 .last_log_ms
243 .compare_exchange(last_ms, now_ms, Ordering::Relaxed, Ordering::Relaxed)
244 .is_err()
245 {
246 return;
247 }
248
249 let n = self.dropped.swap(0, Ordering::Relaxed);
250 log::warn!(
251 "WebSocket send buffer full; dropped {} message(s) for client {} (latest: {}): {}",
252 n,
253 self.client_id,
254 kind,
255 err
256 );
257 }
258}
259
260struct CommandJob {
261 tx_id: Arc<str>,
262 command_id: String,
263 command: serde_json::Value,
264 received_at: Instant,
265}
266
267enum SubscriptionReady {
269 Query {
270 tx_id: Arc<str>,
271 cellmap: hyphae::CellMap<Arc<str>, Arc<dyn myko::item::AnyItem>, hyphae::CellImmutable>,
272 window: Option<myko::wire::QueryWindow>,
273 },
274 View {
275 tx_id: Arc<str>,
276 view_id: Arc<str>,
277 cellmap: hyphae::CellMap<Arc<str>, Arc<dyn myko::item::AnyItem>, hyphae::CellImmutable>,
278 window: Option<myko::wire::QueryWindow>,
279 },
280}
281
282enum OutboundMessage {
283 Message(MykoMessage),
284 SerializedCommand {
285 tx: Arc<str>,
286 command_id: String,
287 payload: EncodedCommandMessage,
288 },
289}
290
291enum DeferredOutbound {
292 Report(Arc<str>, Arc<dyn AnyOutput>),
293 Query {
294 response: PendingQueryResponse,
295 is_view: bool,
296 },
297}
298
299pub struct WsHandler;
301
302impl WsHandler {
303 #[allow(clippy::too_many_arguments)]
305 pub async fn handle_connection(
306 stream: TcpStream,
307 addr: SocketAddr,
308 ctx: Arc<CellServerCtx>,
309 ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
310 ensure_ws_ingest_logger();
311
312 let host_id = ctx.host_id;
313
314 let mut ws_config = WebSocketConfig::default();
315 ws_config.max_message_size = Some(WS_MAX_MESSAGE_SIZE_BYTES);
316 ws_config.max_frame_size = Some(WS_MAX_FRAME_SIZE_BYTES);
317 let ws_stream = accept_async_with_config(stream, Some(ws_config)).await?;
318 let (mut write, mut read) = ws_stream.split();
319
320 let (tx, mut rx) = mpsc::channel::<OutboundMessage>(10_000);
323 let (deferred_tx, mut deferred_rx) = mpsc::channel::<DeferredOutbound>(10_000);
324 let (priority_tx, mut priority_rx) = mpsc::channel::<MykoMessage>(1_000);
325 let (command_tx, mut command_rx) = mpsc::unbounded_channel::<CommandJob>();
326 let (subscribe_tx, mut subscribe_rx) = mpsc::unbounded_channel::<SubscriptionReady>();
327
328 let use_binary = Arc::new(AtomicBool::new(false));
330
331 let client_id: Arc<str> = Uuid::new_v4().to_string().into();
333 let drop_logger = Arc::new(DropLogger::new(client_id.clone()));
334 let writer = ChannelWriter {
335 tx: tx.clone(),
336 deferred_tx: deferred_tx.clone(),
337 drop_logger: drop_logger.clone(),
338 use_binary_writer: use_binary.clone(),
339 };
340
341 let writer_arc: Arc<dyn WsWriter> = Arc::new(ChannelWriter {
343 tx: tx.clone(),
344 deferred_tx: deferred_tx.clone(),
345 drop_logger: drop_logger.clone(),
346 use_binary_writer: use_binary.clone(),
347 });
348 if let Some(registry) = try_client_registry() {
349 registry.register(client_id.clone(), writer_arc);
350 }
351
352 let mut session = ClientSession::new(client_id.clone(), writer);
353
354 let use_binary_writer = use_binary.clone();
355 let query_ids_by_tx: Arc<Mutex<HashMap<Arc<str>, Arc<str>>>> =
356 Arc::new(Mutex::new(HashMap::new()));
357 let view_ids_by_tx: Arc<Mutex<HashMap<Arc<str>, Arc<str>>>> =
358 Arc::new(Mutex::new(HashMap::new()));
359 let subscribe_started_by_tx: Arc<Mutex<HashMap<Arc<str>, Instant>>> =
360 Arc::new(Mutex::new(HashMap::new()));
361 let command_started_by_tx: Arc<Mutex<HashMap<Arc<str>, Instant>>> =
362 Arc::new(Mutex::new(HashMap::new()));
363 let outbound_commands_by_tx: Arc<Mutex<HashMap<String, (String, Instant)>>> =
364 Arc::new(Mutex::new(HashMap::new()));
365
366 let outbound_commands_by_tx_writer = outbound_commands_by_tx.clone();
367
368 let client_entity = Client {
370 id: ClientId(client_id.clone()),
371 server_id: host_id.to_string().into(),
372 address: Some(Arc::from(addr.to_string())),
373 windback: None,
374 };
375 if let Err(e) = ctx.set(&client_entity) {
376 log::error!("Failed to persist client entity: {e}");
377 }
378
379 log::info!("Client connected: {} from {}", client_id, addr);
380
381 let write_ctx = ctx.clone();
382 let write_client_id = client_id.clone();
383 let write_addr = addr;
384 let command_ctx = ctx.clone();
385 let command_priority_tx = priority_tx.clone();
386 let command_drop_logger = drop_logger.clone();
387 let command_client_id = client_id.clone();
388
389 let write_task = tokio::spawn(async move {
391 let _ctx = write_ctx;
392 let mut normal_open = true;
393 let mut priority_open = true;
394 let mut deferred_open = true;
395 while normal_open || priority_open || deferred_open {
396 let msg = tokio::select! {
397 biased;
398 maybe = priority_rx.recv(), if priority_open => {
399 match maybe {
400 Some(msg) => OutboundMessage::Message(msg),
401 None => {
402 priority_open = false;
403 continue;
404 }
405 }
406 }
407 maybe = deferred_rx.recv(), if deferred_open => {
408 match maybe {
409 Some(DeferredOutbound::Report(tx, output)) => {
410 OutboundMessage::Message(MykoMessage::ReportResponse(myko::wire::ReportResponse {
411 response: output.to_value(),
412 tx: tx.to_string(),
413 }))
414 }
415 Some(DeferredOutbound::Query { response, is_view }) => {
416 if is_view {
417 OutboundMessage::Message(MykoMessage::ViewResponse(response.into_wire()))
418 } else {
419 OutboundMessage::Message(MykoMessage::QueryResponse(response.into_wire()))
420 }
421 }
422 None => {
423 deferred_open = false;
424 continue;
425 }
426 }
427 }
428 maybe = rx.recv(), if normal_open => {
429 match maybe {
430 Some(msg) => msg,
431 None => {
432 normal_open = false;
433 continue;
434 }
435 }
436 }
437 };
438 let (kind, tx_id, seq, _upserts, _deletes, _total_count) = match &msg {
439 OutboundMessage::SerializedCommand { tx, .. } => {
440 ("command", Some(tx.clone()), None, None, None, None)
441 }
442 OutboundMessage::Message(msg) => match msg {
443 MykoMessage::ViewResponse(r) => (
444 "view_response",
445 Some(r.tx.clone()),
446 Some(r.sequence),
447 Some(r.upserts.len()),
448 Some(r.deletes.len()),
449 r.total_count,
450 ),
451 MykoMessage::QueryResponse(r) => (
452 "query_response",
453 Some(r.tx.clone()),
454 Some(r.sequence),
455 Some(r.upserts.len()),
456 Some(r.deletes.len()),
457 r.total_count,
458 ),
459 MykoMessage::CommandResponse(r) => (
460 "command_response",
461 Some(Arc::<str>::from(r.tx.clone())),
462 None,
463 None,
464 None,
465 None,
466 ),
467 MykoMessage::CommandError(r) => (
468 "command_error",
469 Some(Arc::<str>::from(r.tx.clone())),
470 None,
471 None,
472 None,
473 None,
474 ),
475 _ => ("other", None, None, None, None, None),
476 },
477 };
478
479 match &msg {
480 OutboundMessage::SerializedCommand { tx, command_id, .. } => {
481 if !tx.trim().is_empty()
482 && let Ok(mut map) = outbound_commands_by_tx_writer.lock()
483 {
484 map.insert(tx.to_string(), (command_id.clone(), Instant::now()));
485 }
486 }
487 OutboundMessage::Message(MykoMessage::Command(wrapped)) => {
488 if let Some(tx_id) = wrapped.command.get("tx").and_then(|v| v.as_str())
489 && !tx_id.trim().is_empty()
490 && let Ok(mut map) = outbound_commands_by_tx_writer.lock()
491 {
492 map.insert(
493 tx_id.to_string(),
494 (wrapped.command_id.clone(), Instant::now()),
495 );
496 }
497 }
498 _ => {}
499 }
500
501 let ws_msg = match &msg {
502 OutboundMessage::SerializedCommand {
503 payload: EncodedCommandMessage::Json(json),
504 ..
505 } => Message::Text(json.clone().into()),
506 OutboundMessage::SerializedCommand {
507 payload: EncodedCommandMessage::Msgpack(bytes),
508 ..
509 } => Message::Binary(bytes.clone().into()),
510 OutboundMessage::Message(msg) if use_binary_writer.load(Ordering::SeqCst) => {
511 match rmp_serde::to_vec(msg) {
512 Ok(bytes) => Message::Binary(bytes.into()),
513 Err(e) => {
514 log::error!("Failed to serialize message to msgpack: {}", e);
515 continue;
516 }
517 }
518 }
519 OutboundMessage::Message(msg) => match serde_json::to_string(msg) {
520 Ok(json) => Message::Text(json.into()),
521 Err(e) => {
522 log::error!("Failed to serialize message to JSON: {}", e);
523 continue;
524 }
525 },
526 };
527 let payload_bytes = match &ws_msg {
528 Message::Binary(b) => b.len(),
529 Message::Text(t) => t.len(),
530 _ => 0,
531 };
532
533 if let Err(err) = write.send(ws_msg).await {
534 log::error!(
535 "WebSocket write failed for client {} from {} kind={} tx={:?} seq={:?} payload_bytes={} binary={}: {}",
536 write_client_id,
537 write_addr,
538 kind,
539 tx_id,
540 seq,
541 payload_bytes,
542 use_binary_writer.load(Ordering::SeqCst),
543 err
544 );
545 break;
546 }
547 }
548 log::warn!(
549 "WebSocket writer task exiting for client {} from {} normal_open={} priority_open={} deferred_open={}",
550 write_client_id,
551 write_addr,
552 normal_open,
553 priority_open,
554 deferred_open
555 );
556 });
557
558 let command_started_cleanup = command_started_by_tx.clone();
561 let command_task = tokio::spawn(async move {
562 while let Some(job) = command_rx.recv().await {
563 let command_ctx = command_ctx.clone();
564 let command_priority_tx = command_priority_tx.clone();
565 let command_drop_logger = command_drop_logger.clone();
566 let command_client_id = command_client_id.clone();
567 let tx_id = job.tx_id.clone();
568 let started_map = command_started_cleanup.clone();
569 match tokio::task::spawn_blocking(move || {
570 Self::execute_command_job(
571 command_ctx,
572 &command_priority_tx,
573 command_drop_logger.as_ref(),
574 command_client_id,
575 job,
576 );
577 })
578 .await
579 {
580 Ok(()) => {}
581 Err(e) => {
582 log::error!("Command worker panicked: {}", e);
583 }
584 }
585 if let Ok(mut map) = started_map.lock() {
587 map.remove(&tx_id);
588 }
589 }
590 });
591
592 let mut outbound_ttl_interval = interval(Duration::from_secs(10));
596 outbound_ttl_interval.tick().await; loop {
598 tokio::select! {
599 Some(ready) = subscribe_rx.recv() => {
601 let tx_id = match &ready {
602 SubscriptionReady::Query { tx_id, .. } => tx_id.clone(),
603 SubscriptionReady::View { tx_id, .. } => tx_id.clone(),
604 };
605 if let Ok(mut map) = subscribe_started_by_tx.lock() {
606 map.remove(&tx_id);
607 }
608 match ready {
609 SubscriptionReady::Query { tx_id, cellmap, window } => {
610 session.subscribe_query(tx_id, cellmap, window);
611 }
612 SubscriptionReady::View { tx_id, view_id, cellmap, window } => {
613 session.subscribe_view_with_id(tx_id, view_id, cellmap, window);
614 }
615 }
616 }
617 _ = outbound_ttl_interval.tick() => {
621 if let Ok(mut map) = outbound_commands_by_tx.lock() {
622 let before = map.len();
623 map.retain(|_, (_, started)| started.elapsed() < Duration::from_secs(10));
624 let removed = before - map.len();
625 if removed > 0 {
626 log::debug!(
627 "Outbound command TTL sweep client={}: removed {} stale entries, {} remaining",
628 session.client_id,
629 removed,
630 map.len()
631 );
632 }
633 }
634 }
635 msg = read.next() => {
637 let Some(msg) = msg else { break };
638 let ctx = ctx.clone();
639 let msg = match msg {
640 Ok(m) => m,
641 Err(e) => {
642 log::error!("WebSocket read error from {}: {}", client_id, e);
643 break;
644 }
645 };
646
647 match msg {
648 Message::Binary(data) => {
649 if !use_binary.load(Ordering::SeqCst) {
650 log::debug!(
651 "Client {} switched to binary (msgpack) protocol via auto-detect",
652 client_id
653 );
654 use_binary.store(true, Ordering::SeqCst);
655 }
656
657 match rmp_serde::from_slice::<MykoMessage>(&data) {
658 Ok(myko_msg) => {
659 if let Err(e) = Self::handle_message(
660 &mut session,
661 ctx,
662 &priority_tx,
663 &drop_logger,
664 &query_ids_by_tx,
665 &view_ids_by_tx,
666 &subscribe_started_by_tx,
667 &command_started_by_tx,
668 &outbound_commands_by_tx,
669 &command_tx,
670 &subscribe_tx,
671 myko_msg,
672 ) {
673 log::error!("Error handling message: {}", e);
674 }
675 tokio::task::yield_now().await;
676 }
677 Err(e) => {
678 log::warn!("Failed to parse message from {}: {}", client_id, e);
679 }
680 }
681 }
682 Message::Text(text) => {
683 if text == SWITCH_TO_MSGPACK {
684 log::debug!(
685 "Client {} switched to binary (msgpack) protocol via explicit request",
686 client_id
687 );
688 if let Err(e) = priority_tx.try_send(MykoMessage::ProtocolSwitch {
689 protocol: "msgpack".into(),
690 }) {
691 drop_logger.on_drop("ProtocolSwitch", &e);
692 }
693 use_binary.store(true, Ordering::SeqCst);
694 continue;
695 }
696
697 match serde_json::from_str::<MykoMessage>(&text) {
698 Ok(myko_msg) => {
699 if let Err(e) = Self::handle_message(
700 &mut session,
701 ctx,
702 &priority_tx,
703 &drop_logger,
704 &query_ids_by_tx,
705 &view_ids_by_tx,
706 &subscribe_started_by_tx,
707 &command_started_by_tx,
708 &outbound_commands_by_tx,
709 &command_tx,
710 &subscribe_tx,
711 myko_msg,
712 ) {
713 log::error!("Error handling message: {}", e);
714 }
715 tokio::task::yield_now().await;
716 }
717 Err(e) => {
718 log::warn!(
719 "Failed to parse JSON message from {}: {} | raw: {}",
720 client_id,
721 e,
722 if text.len() > 1000 {
723 &text[..1000]
724 } else {
725 &text
726 }
727 );
728 }
729 }
730 }
731 Message::Ping(data) => {
732 log::trace!("Ping from {}", client_id);
733 let _ = data;
734 }
735 Message::Pong(_) => {
736 log::trace!("Pong from {}", client_id);
737 }
738 Message::Close(frame) => {
739 log::warn!("Client {} sent close frame: {:?}", client_id, frame);
740 break;
741 }
742 Message::Frame(_) => {}
743 }
744 }
745 }
746 }
747
748 drop(session); write_task.abort();
751 command_task.abort();
752
753 if let Some(registry) = try_client_registry() {
755 registry.unregister(&client_id);
756 }
757
758 if let Err(e) = ctx.del(&client_entity) {
760 log::error!("Failed to delete client entity: {e}");
761 }
762
763 log::info!("Client disconnected: {} from {}", client_id, addr);
764
765 Ok(())
766 }
767
768 #[allow(clippy::too_many_arguments)]
770 fn handle_message<W: WsWriter>(
771 session: &mut ClientSession<W>,
772 ctx: Arc<CellServerCtx>,
773 priority_tx: &mpsc::Sender<MykoMessage>,
774 drop_logger: &Arc<DropLogger>,
775 query_ids_by_tx: &Arc<Mutex<HashMap<Arc<str>, Arc<str>>>>,
776 view_ids_by_tx: &Arc<Mutex<HashMap<Arc<str>, Arc<str>>>>,
777 subscribe_started_by_tx: &Arc<Mutex<HashMap<Arc<str>, Instant>>>,
778 command_started_by_tx: &Arc<Mutex<HashMap<Arc<str>, Instant>>>,
779 outbound_commands_by_tx: &Arc<Mutex<HashMap<String, (String, Instant)>>>,
780 command_tx: &mpsc::UnboundedSender<CommandJob>,
781 subscribe_tx: &mpsc::UnboundedSender<SubscriptionReady>,
782 msg: MykoMessage,
783 ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
784 let handler_registry = ctx.handler_registry.clone();
785
786 let registry = ctx.registry.clone();
787
788 let host_id = ctx.host_id;
789
790 match msg {
791 MykoMessage::Query(wrapped) => {
792 let tx_id: Arc<str> = wrapped
794 .query
795 .get("tx")
796 .and_then(|v| v.as_str())
797 .unwrap_or("unknown")
798 .into();
799 let query_id = &wrapped.query_id;
800 let entity_type = &wrapped.query_item_type;
801
802 if session.has_subscription(&tx_id) {
805 log::debug!(
806 "Ignoring duplicate query subscribe client={} tx={} query_id={} item_type={}",
807 session.client_id,
808 tx_id,
809 query_id,
810 entity_type
811 );
812 return Ok(());
813 }
814 if let Ok(mut map) = query_ids_by_tx.lock() {
815 map.insert(tx_id.clone(), query_id.clone());
816 }
817 if let Ok(mut map) = subscribe_started_by_tx.lock() {
818 map.entry(tx_id.clone()).or_insert_with(Instant::now);
819 }
820
821 log::trace!("Query {} for {} (tx: {})", query_id, entity_type, tx_id);
822 log::trace!(
823 "Query subscribe request client={} tx={} query_id={} item_type={} window={} active_subscriptions_before={}",
824 session.client_id,
825 tx_id,
826 query_id,
827 entity_type,
828 wrapped.window.is_some(),
829 session.subscription_count()
830 );
831
832 let request_context = Arc::new(RequestContext::from_client(
833 tx_id.clone(),
834 session.client_id.clone(),
835 host_id,
836 ));
837
838 if let Some(query_data) = handler_registry.get_query(query_id) {
839 let parsed = (query_data.parse)(wrapped.query.clone());
840 match parsed {
841 Ok(any_query) => {
842 let cell_factory = query_data.cell_factory;
845 let registry = registry.clone();
846 let request_context = request_context.clone();
847 let ctx = ctx.clone();
848 let window = wrapped.window.clone();
849 let query_id = query_id.clone();
850 let sub_tx = subscribe_tx.clone();
851 tokio::task::spawn_blocking(move || {
852 match cell_factory(any_query, registry, request_context, Some(ctx))
853 {
854 Ok(filtered_cellmap) => {
855 let _ = sub_tx.send(SubscriptionReady::Query {
856 tx_id,
857 cellmap: filtered_cellmap,
858 window,
859 });
860 }
861 Err(e) => {
862 log::error!(
863 "Failed to create query cell for {}: {}",
864 query_id,
865 e
866 );
867 }
868 }
869 });
870 }
871 Err(e) => {
872 log::error!(
873 "Failed to parse query {}: {} | payload: {}",
874 query_id,
875 e,
876 serde_json::to_string(&wrapped.query).unwrap_or_default()
877 );
878 }
879 }
880 } else {
881 log::warn!(
883 "No registered query handler for {}, falling back to select all",
884 query_id
885 );
886 let cellmap = registry.get_or_create(entity_type).select(|_| true);
887 session.subscribe_query(tx_id, cellmap, wrapped.window.clone());
888 }
889 }
890
891 MykoMessage::View(wrapped) => {
892 let tx_id: Arc<str> = wrapped
893 .view
894 .get("tx")
895 .and_then(|v| v.as_str())
896 .unwrap_or("unknown")
897 .into();
898 let view_id = &wrapped.view_id;
899 let item_type = &wrapped.view_item_type;
900
901 if session.has_subscription(&tx_id) {
903 log::debug!(
904 "Ignoring duplicate view subscribe client={} tx={} view_id={} item_type={}",
905 session.client_id,
906 tx_id,
907 view_id,
908 item_type
909 );
910 return Ok(());
911 }
912 if let Ok(mut map) = view_ids_by_tx.lock() {
913 map.insert(tx_id.clone(), view_id.clone());
914 }
915 if let Ok(mut map) = subscribe_started_by_tx.lock() {
916 map.entry(tx_id.clone()).or_insert_with(Instant::now);
917 }
918
919 log::trace!("View {} for {} (tx: {})", view_id, item_type, tx_id);
920 log::trace!(
921 "View subscribe request client={} tx={} view_id={} item_type={} window={:?}",
922 session.client_id,
923 tx_id,
924 view_id,
925 item_type,
926 wrapped.window
927 );
928
929 let request_context = Arc::new(RequestContext::from_client(
930 tx_id.clone(),
931 session.client_id.clone(),
932 host_id,
933 ));
934
935 if let Some(view_data) = handler_registry.get_view(view_id) {
936 let parsed = (view_data.parse)(wrapped.view.clone());
937 match parsed {
938 Ok(any_view) => {
939 log::trace!(
940 "View parsed successfully client={} tx={} view_id={}",
941 session.client_id,
942 tx_id,
943 view_id
944 );
945 let cell_factory = view_data.cell_factory;
948 let registry = registry.clone();
949 let ctx = ctx.clone();
950 let window = wrapped.window.clone();
951 let view_id_clone = view_id.clone();
952 let sub_tx = subscribe_tx.clone();
953 let priority_tx = priority_tx.clone();
954 let drop_logger = drop_logger.clone();
955 tokio::task::spawn_blocking(move || {
956 match cell_factory(any_view, registry, request_context, ctx) {
957 Ok(filtered_cellmap) => {
958 log::trace!(
959 "View cell factory succeeded tx={} view_id={}",
960 tx_id,
961 view_id_clone
962 );
963 let _ = sub_tx.send(SubscriptionReady::View {
964 tx_id,
965 view_id: view_id_clone,
966 cellmap: filtered_cellmap,
967 window,
968 });
969 }
970 Err(e) => {
971 log::error!(
972 "Failed to create view cell for {}: {}",
973 view_id_clone,
974 e
975 );
976 if let Err(err) = priority_tx.try_send(
977 MykoMessage::ViewError(ViewError {
978 tx: tx_id.to_string(),
979 view_id: view_id_clone.to_string(),
980 message: e,
981 }),
982 ) {
983 drop_logger.on_drop("ViewError", &err);
984 }
985 }
986 }
987 });
988 }
989 Err(e) => {
990 let message = format!("Failed to parse view {}: {}", view_id, e);
991 log::error!(
992 "{} | payload: {}",
993 message,
994 serde_json::to_string(&wrapped.view).unwrap_or_default()
995 );
996 if let Err(err) =
997 priority_tx.try_send(MykoMessage::ViewError(ViewError {
998 tx: tx_id.to_string(),
999 view_id: view_id.to_string(),
1000 message,
1001 }))
1002 {
1003 drop_logger.on_drop("ViewError", &err);
1004 }
1005 }
1006 }
1007 } else {
1008 let message = format!("No registered handler for view: {}", view_id);
1009 log::warn!("{}", message);
1010 if let Err(err) = priority_tx.try_send(MykoMessage::ViewError(ViewError {
1011 tx: tx_id.to_string(),
1012 view_id: view_id.to_string(),
1013 message,
1014 })) {
1015 drop_logger.on_drop("ViewError", &err);
1016 }
1017 }
1018 }
1019
1020 MykoMessage::QueryCancel(CancelSubscription { tx: tx_id }) => {
1021 log::trace!(
1022 "QueryCancel received: client={} tx={}",
1023 session.client_id,
1024 tx_id
1025 );
1026 let tx_id: Arc<str> = tx_id.into();
1027 if let Ok(mut map) = query_ids_by_tx.lock() {
1028 map.remove(&tx_id);
1029 }
1030 if let Ok(mut map) = subscribe_started_by_tx.lock() {
1031 map.remove(&tx_id);
1032 }
1033 session.cancel(&tx_id);
1034 }
1035
1036 MykoMessage::QueryWindow(QueryWindowUpdate { tx, window }) => {
1037 let tx_id: Arc<str> = tx.into();
1038 log::trace!(
1039 "Query window request client={} tx={} has_window={} active_subscriptions={}",
1040 session.client_id,
1041 tx_id,
1042 window.is_some(),
1043 session.subscription_count()
1044 );
1045 session.update_query_window(&tx_id, window);
1046 }
1047 MykoMessage::ViewCancel(CancelSubscription { tx: tx_id }) => {
1048 log::trace!("View cancel: {}", tx_id);
1049 let tx_id: Arc<str> = tx_id.into();
1050 if let Ok(mut map) = view_ids_by_tx.lock() {
1051 map.remove(&tx_id);
1052 }
1053 if let Ok(mut map) = subscribe_started_by_tx.lock() {
1054 map.remove(&tx_id);
1055 }
1056 session.cancel(&tx_id);
1057 }
1058 MykoMessage::ViewWindow(ViewWindowUpdate { tx, window }) => {
1059 let tx_id: Arc<str> = tx.into();
1060 log::trace!("View window update: {}", tx_id);
1061 session.update_view_window(&tx_id, window);
1062 }
1063
1064 MykoMessage::Report(wrapped) => {
1065 let tx_id: Arc<str> = wrapped
1067 .report
1068 .get("tx")
1069 .and_then(|v| v.as_str())
1070 .unwrap_or("unknown")
1071 .into();
1072 let report_id = &wrapped.report_id;
1073
1074 log::trace!(
1075 "Report subscribe request client={} tx={} report_id={} active_subscriptions_before={}",
1076 session.client_id,
1077 tx_id,
1078 report_id,
1079 session.subscription_count()
1080 );
1081
1082 if let Some(report_data) = handler_registry.get_report(report_id) {
1084 let parsed = (report_data.parse)(wrapped.report.clone());
1086 match parsed {
1087 Ok(any_report) => {
1088 let request_context = Arc::new(RequestContext::from_client(
1089 tx_id.clone(),
1090 session.client_id.clone(),
1091 host_id,
1092 ));
1093
1094 match (report_data.cell_factory)(any_report, request_context, ctx) {
1096 Ok(cell) => {
1097 session.subscribe_report(
1098 tx_id,
1099 report_id.as_str().into(),
1100 cell,
1101 );
1102 }
1103 Err(e) => {
1104 log::error!(
1105 "Failed to create report cell for {}: {}",
1106 report_id,
1107 e
1108 );
1109 }
1110 }
1111 }
1112 Err(e) => {
1113 log::error!(
1114 "Failed to parse report {}: {} | payload: {}",
1115 report_id,
1116 e,
1117 serde_json::to_string(&wrapped.report).unwrap_or_default()
1118 );
1119 }
1120 }
1121 } else {
1122 log::warn!("No registered handler for report: {}", report_id);
1123 }
1124 }
1125
1126 MykoMessage::ReportCancel(CancelSubscription { tx: tx_id }) => {
1127 log::trace!(
1128 "ReportCancel received: client={} tx={} active_subscriptions_before={}",
1129 session.client_id,
1130 tx_id,
1131 session.subscription_count()
1132 );
1133 session.cancel(&tx_id.into());
1134 }
1135
1136 MykoMessage::Event(mut event) => {
1137 record_ws_ingest(std::slice::from_ref(&event));
1138 event.sanitize_null_bytes();
1139 normalize_incoming_event(&mut event, &session.client_id);
1140 if let Err(e) = ctx.apply_event(event) {
1141 log::error!(
1142 "Failed to apply event from client {}: {e}",
1143 session.client_id
1144 );
1145 }
1146 }
1147
1148 MykoMessage::EventBatch(mut events) => {
1149 record_ws_ingest(&events);
1150 let incoming = events.len();
1151 if incoming >= 64 {
1152 log::trace!(
1153 "Received event batch from client {} size={}",
1154 session.client_id,
1155 incoming
1156 );
1157 }
1158 for event in &mut events {
1159 event.sanitize_null_bytes();
1160 normalize_incoming_event(event, &session.client_id);
1161 }
1162 match ctx.apply_event_batch(events) {
1163 Ok(applied) => {
1164 log::trace!(
1165 "Applied event batch from client {} size={}",
1166 session.client_id,
1167 applied
1168 );
1169 }
1170 Err(e) => {
1171 log::error!(
1172 "Failed to apply event batch from client {}: {}",
1173 session.client_id,
1174 e
1175 );
1176 }
1177 }
1178 }
1179
1180 MykoMessage::Command(wrapped) => {
1181 let tx_id: Arc<str> = wrapped
1183 .command
1184 .get("tx")
1185 .and_then(|v| v.as_str())
1186 .unwrap_or("unknown")
1187 .into();
1188
1189 let command_id = &wrapped.command_id;
1190
1191 log::trace!("Command {} (tx: {})", command_id, tx_id,);
1192 let received_at = Instant::now();
1193 if let Ok(mut map) = command_started_by_tx.lock() {
1194 map.insert(tx_id.clone(), received_at);
1195 }
1196 if let Err(e) = command_tx.send(CommandJob {
1197 tx_id: tx_id.clone(),
1198 command_id: wrapped.command_id.clone(),
1199 command: wrapped.command.clone(),
1200 received_at,
1201 }) {
1202 log::error!(
1203 "Failed to enqueue command {} for client {} tx={}: {}",
1204 command_id,
1205 session.client_id,
1206 tx_id,
1207 e
1208 );
1209 let error = MykoMessage::CommandError(CommandError {
1210 tx: tx_id.to_string(),
1211 command_id: command_id.to_string(),
1212 message: "Command queue unavailable".to_string(),
1213 });
1214 if let Err(err) = priority_tx.try_send(error) {
1215 drop_logger.on_drop("CommandError", &err);
1216 }
1217 }
1218 }
1219
1220 MykoMessage::Ping(PingData { id, timestamp }) => {
1221 let pong = MykoMessage::Ping(PingData { id, timestamp });
1223 if let Err(e) = priority_tx.try_send(pong) {
1224 drop_logger.on_drop("Ping", &e);
1225 }
1226 }
1227
1228 MykoMessage::QueryResponse(resp) => {
1230 log::warn!(
1231 "Unexpected client message kind=query_response client={} tx={} seq={} upserts={} deletes={} active_subscriptions={}",
1232 session.client_id,
1233 resp.tx,
1234 resp.sequence,
1235 resp.upserts.len(),
1236 resp.deletes.len(),
1237 session.subscription_count()
1238 );
1239 }
1240 MykoMessage::QueryError(err) => {
1241 log::warn!(
1242 "Unexpected client message kind=query_error client={} tx={} query_id={} message={} active_subscriptions={}",
1243 session.client_id,
1244 err.tx,
1245 err.query_id,
1246 err.message,
1247 session.subscription_count()
1248 );
1249 }
1250 MykoMessage::ViewResponse(resp) => {
1251 log::warn!(
1252 "Unexpected client message kind=view_response client={} tx={} seq={} upserts={} deletes={} active_subscriptions={}",
1253 session.client_id,
1254 resp.tx,
1255 resp.sequence,
1256 resp.upserts.len(),
1257 resp.deletes.len(),
1258 session.subscription_count()
1259 );
1260 }
1261 MykoMessage::ViewError(err) => {
1262 log::warn!(
1263 "Unexpected client message kind=view_error client={} tx={} view_id={} message={} active_subscriptions={}",
1264 session.client_id,
1265 err.tx,
1266 err.view_id,
1267 err.message,
1268 session.subscription_count()
1269 );
1270 }
1271 MykoMessage::ReportResponse(resp) => {
1272 log::warn!(
1273 "Unexpected client message kind=report_response client={} tx={} active_subscriptions={}",
1274 session.client_id,
1275 resp.tx,
1276 session.subscription_count()
1277 );
1278 }
1279 MykoMessage::ReportError(err) => {
1280 log::warn!(
1281 "Unexpected client message kind=report_error client={} tx={} report_id={} message={} active_subscriptions={}",
1282 session.client_id,
1283 err.tx,
1284 err.report_id,
1285 err.message,
1286 session.subscription_count()
1287 );
1288 }
1289 MykoMessage::CommandResponse(resp) => {
1290 if resp.tx.trim().is_empty() {
1291 log::warn!(
1292 "Malformed client message kind=command_response client={} tx=<empty> active_subscriptions={}",
1293 session.client_id,
1294 session.subscription_count()
1295 );
1296 } else {
1297 let correlated = outbound_commands_by_tx
1298 .lock()
1299 .ok()
1300 .and_then(|mut map| map.remove(&resp.tx));
1301 if let Some((command_id, started)) = correlated {
1302 log::debug!(
1303 "Client command response matched outbound command client={} tx={} command_id={} roundtrip_ms={} active_subscriptions={}",
1304 session.client_id,
1305 resp.tx,
1306 command_id,
1307 started.elapsed().as_millis(),
1308 session.subscription_count()
1309 );
1310 } else {
1311 log::warn!(
1312 "Client command response without outbound match client={} tx={} active_subscriptions={}",
1313 session.client_id,
1314 resp.tx,
1315 session.subscription_count()
1316 );
1317 }
1318 }
1319 }
1320 MykoMessage::CommandError(err) => {
1321 if err.tx.trim().is_empty() {
1322 log::warn!(
1323 "Malformed client message kind=command_error client={} tx=<empty> command_id={} message={} active_subscriptions={}",
1324 session.client_id,
1325 err.command_id,
1326 err.message,
1327 session.subscription_count()
1328 );
1329 } else {
1330 let correlated = outbound_commands_by_tx
1331 .lock()
1332 .ok()
1333 .and_then(|mut map| map.remove(&err.tx));
1334 if let Some((command_id, started)) = correlated {
1335 log::warn!(
1336 "Client command error matched outbound command client={} tx={} command_id={} transport_command_id={} message={} roundtrip_ms={} active_subscriptions={}",
1337 session.client_id,
1338 err.tx,
1339 err.command_id,
1340 command_id,
1341 err.message,
1342 started.elapsed().as_millis(),
1343 session.subscription_count()
1344 );
1345 } else {
1346 log::warn!(
1347 "Client command error without outbound match client={} tx={} command_id={} message={} active_subscriptions={}",
1348 session.client_id,
1349 err.tx,
1350 err.command_id,
1351 err.message,
1352 session.subscription_count()
1353 );
1354 }
1355 }
1356 }
1357 MykoMessage::Benchmark(payload) => {
1358 let stats = ws_benchmark_stats();
1359 ensure_ws_benchmark_logger();
1360 stats.message_count.fetch_add(1, Ordering::Relaxed);
1361 let size = payload.to_string().len() as u64;
1363 stats.total_bytes.fetch_add(size, Ordering::Relaxed);
1364 }
1365 MykoMessage::ProtocolSwitch { protocol } => {
1366 log::warn!(
1367 "Unexpected client message kind=protocol_switch_ack client={} protocol={} active_subscriptions={}",
1368 session.client_id,
1369 protocol,
1370 session.subscription_count()
1371 );
1372 }
1373 }
1374
1375 Ok(())
1376 }
1377
1378 fn execute_command_job(
1379 ctx: Arc<CellServerCtx>,
1380 priority_tx: &mpsc::Sender<MykoMessage>,
1381 drop_logger: &DropLogger,
1382 client_id: Arc<str>,
1383 job: CommandJob,
1384 ) {
1385 let host_id = ctx.host_id;
1386 let started = Instant::now();
1387 let queue_wait_ms = started.duration_since(job.received_at).as_millis();
1388 let command_id = job.command_id.clone();
1389
1390 let mut handler_found = false;
1391 for registration in inventory::iter::<CommandHandlerRegistration> {
1392 if registration.command_id == command_id {
1393 handler_found = true;
1394 let executor = (registration.factory)();
1395
1396 let req = Arc::new(RequestContext::from_client(
1397 job.tx_id.clone(),
1398 client_id.clone(),
1399 host_id,
1400 ));
1401 let cmd_id: Arc<str> = Arc::from(command_id.clone());
1402 let cmd_ctx = CommandContext::new(cmd_id, req, ctx.clone());
1403 let execute_started = Instant::now();
1404
1405 match executor.execute_from_value(job.command.clone(), cmd_ctx) {
1406 Ok(result) => {
1407 let response = MykoMessage::CommandResponse(CommandResponse {
1408 response: result,
1409 tx: job.tx_id.to_string(),
1410 });
1411 if let Err(e) = priority_tx.try_send(response) {
1412 drop_logger.on_drop("CommandResponse", &e);
1413 }
1414 }
1415 Err(e) => {
1416 let error = MykoMessage::CommandError(CommandError {
1417 tx: job.tx_id.to_string(),
1418 command_id: command_id.clone(),
1419 message: e.message,
1420 });
1421 if let Err(err) = priority_tx.try_send(error) {
1422 drop_logger.on_drop("CommandError", &err);
1423 }
1424 }
1425 }
1426 let execute_ms = execute_started.elapsed().as_millis();
1427 let total_ms = job.received_at.elapsed().as_millis();
1428 log::trace!(
1429 target: "myko_server::ws_perf",
1430 "command_exec client={} tx={} command_id={} queue_wait_ms={} execute_ms={} total_ms={}",
1431 client_id,
1432 job.tx_id,
1433 command_id,
1434 queue_wait_ms,
1435 execute_ms,
1436 total_ms
1437 );
1438 break;
1439 }
1440 }
1441
1442 if !handler_found {
1443 log::warn!("No registered handler for command: {}", command_id);
1444 let error = MykoMessage::CommandError(CommandError {
1445 tx: job.tx_id.to_string(),
1446 command_id: command_id.clone(),
1447 message: format!("Command handler not found: {}", command_id),
1448 });
1449 if let Err(e) = priority_tx.try_send(error) {
1450 drop_logger.on_drop("CommandError", &e);
1451 }
1452 }
1453
1454 if !handler_found {
1455 log::debug!(
1456 target: "myko_server::ws_perf",
1457 "command_exec client={} tx={} command_id={} queue_wait_ms={} execute_ms=0 total_ms={} handler_found=false",
1458 client_id,
1459 job.tx_id,
1460 command_id,
1461 queue_wait_ms,
1462 job.received_at.elapsed().as_millis()
1463 );
1464 }
1465 }
1466}
1467
1468struct ChannelWriter {
1473 tx: mpsc::Sender<OutboundMessage>,
1474 deferred_tx: mpsc::Sender<DeferredOutbound>,
1475 drop_logger: Arc<DropLogger>,
1476 use_binary_writer: Arc<AtomicBool>,
1477}
1478
1479impl WsWriter for ChannelWriter {
1480 fn send(&self, msg: MykoMessage) {
1481 if let Err(e) = self.tx.try_send(OutboundMessage::Message(msg)) {
1483 self.drop_logger.on_drop("message", &e);
1484 }
1485 }
1486
1487 fn protocol(&self) -> myko::client::MykoProtocol {
1488 if self.use_binary_writer.load(Ordering::SeqCst) {
1489 myko::client::MykoProtocol::MSGPACK
1490 } else {
1491 myko::client::MykoProtocol::JSON
1492 }
1493 }
1494
1495 fn send_serialized_command(
1496 &self,
1497 tx: Arc<str>,
1498 command_id: String,
1499 payload: EncodedCommandMessage,
1500 ) {
1501 if let Err(e) = self.tx.try_send(OutboundMessage::SerializedCommand {
1502 tx,
1503 command_id,
1504 payload,
1505 }) {
1506 self.drop_logger.on_drop("serialized_command", &e);
1507 }
1508 }
1509
1510 fn send_report_response(&self, tx: Arc<str>, output: Arc<dyn AnyOutput>) {
1511 if let Err(e) = self
1512 .deferred_tx
1513 .try_send(DeferredOutbound::Report(tx, output))
1514 {
1515 self.drop_logger.on_drop("ReportResponseDeferred", &e);
1516 }
1517 }
1518
1519 fn send_query_response(&self, response: PendingQueryResponse, is_view: bool) {
1520 if let Err(e) = self
1521 .deferred_tx
1522 .try_send(DeferredOutbound::Query { response, is_view })
1523 {
1524 self.drop_logger.on_drop("QueryResponseDeferred", &e);
1525 }
1526 }
1527}
1528
1529#[cfg(test)]
1530mod tests {
1531 use super::*;
1532
1533 #[test]
1534 fn test_channel_writer() {
1535 let (tx, mut rx) = mpsc::channel(10);
1536 let (deferred_tx, _deferred_rx) = mpsc::channel(10);
1537 let drop_logger = Arc::new(DropLogger::new("test-client".into()));
1538 let writer = ChannelWriter {
1539 tx,
1540 deferred_tx,
1541 drop_logger,
1542 use_binary_writer: Arc::new(AtomicBool::new(false)),
1543 };
1544
1545 let msg = MykoMessage::Ping(PingData {
1546 id: "test".to_string(),
1547 timestamp: 0,
1548 });
1549 writer.send(msg);
1550
1551 let received = rx.try_recv().unwrap();
1552 assert!(matches!(
1553 received,
1554 OutboundMessage::Message(MykoMessage::Ping(_))
1555 ));
1556 }
1557}