1use std::{
9 collections::{HashMap, HashSet},
10 sync::{Arc, Mutex},
11 time::Instant,
12};
13
14use hyphae::{Cell, CellImmutable, Signal, SubscriptionGuard, Watchable};
15
16use crate::{
17 client::MykoProtocol,
18 core::item::AnyItem,
19 report::AnyOutput,
20 wire::{
21 EncodedCommandMessage, ErasedWrappedItem, MykoMessage, QueryChange, QueryResponse,
22 QueryWindow, ReportError, ReportResponse,
23 },
24};
25
26pub trait WsWriter: Send + Sync + 'static {
31 fn send(&self, msg: MykoMessage);
33
34 fn protocol(&self) -> MykoProtocol {
36 MykoProtocol::JSON
37 }
38
39 fn send_serialized_command(
41 &self,
42 tx: Arc<str>,
43 command_id: String,
44 payload: EncodedCommandMessage,
45 );
46
47 fn send_report_response(&self, tx: Arc<str>, output: Arc<dyn AnyOutput>) {
50 self.send(MykoMessage::ReportResponse(ReportResponse {
51 response: output.to_value(),
52 tx: tx.to_string(),
53 }));
54 }
55
56 fn send_query_response(&self, response: PendingQueryResponse, is_view: bool) {
59 let wire = response.into_wire();
60 if is_view {
61 self.send(MykoMessage::ViewResponse(wire));
62 } else {
63 self.send(MykoMessage::QueryResponse(wire));
64 }
65 }
66}
67
68#[derive(Clone)]
69pub struct PendingQueryResponse {
70 pub tx: Arc<str>,
71 pub sequence: u64,
72 pub upsert_items: Vec<Arc<dyn AnyItem>>,
73 pub deletes: Vec<Arc<str>>,
74 pub total_count: usize,
75 pub window: Option<QueryWindow>,
76 pub window_order_ids: Option<Vec<Arc<str>>>,
77}
78
79impl PendingQueryResponse {
80 pub fn into_wire(self) -> QueryResponse {
81 let upserts: Vec<ErasedWrappedItem> = self
82 .upsert_items
83 .iter()
84 .map(|item| ErasedWrappedItem {
85 item: item.clone(),
86 item_type: item.entity_type().into(),
87 })
88 .collect();
89
90 let mut changes: Vec<QueryChange> = Vec::with_capacity(
91 upserts.len() + self.deletes.len() + usize::from(self.window_order_ids.is_some()),
92 );
93 for item in &upserts {
94 changes.push(QueryChange::Upsert { item: item.clone() });
95 }
96 for id in &self.deletes {
97 changes.push(QueryChange::Delete { id: id.clone() });
98 }
99 if let Some(ids) = self.window_order_ids {
100 changes.push(QueryChange::WindowOrder {
101 ids,
102 total_count: self.total_count,
103 window: self.window.clone(),
104 });
105 }
106
107 QueryResponse {
108 tx: self.tx,
109 sequence: self.sequence,
110 changes,
111 upserts,
112 deletes: self.deletes,
113 total_count: Some(self.total_count),
114 window: self.window,
115 }
116 }
117}
118
119pub struct ClientSession<W: WsWriter> {
124 pub client_id: Arc<str>,
126 writer: Arc<W>,
128 subscriptions: HashMap<Arc<str>, SubscriptionEntry>,
130}
131
132enum SubscriptionEntry {
133 Query(QuerySubscription),
134 Guard { _guard: SubscriptionGuard },
135}
136
137struct QuerySubscription {
138 _guard: SubscriptionGuard,
139 state: Arc<Mutex<QuerySubscriptionState>>,
140 kind: QuerySubscriptionKind,
141}
142
143#[derive(Clone, Copy)]
144enum QuerySubscriptionKind {
145 Query,
146 View,
147}
148
149#[derive(Default)]
150struct QuerySubscriptionState {
151 sequence: u64,
152 window: Option<QueryWindow>,
153 all_items: HashMap<Arc<str>, Arc<dyn AnyItem>>,
154 visible_items: HashMap<Arc<str>, Arc<dyn AnyItem>>,
155}
156
157impl<W: WsWriter> ClientSession<W> {
158 pub fn new(client_id: Arc<str>, writer: W) -> Self {
160 Self {
161 client_id,
162 writer: Arc::new(writer),
163 subscriptions: HashMap::new(),
164 }
165 }
166
167 pub fn subscribe_query(
171 &mut self,
172 tx: Arc<str>,
173 cell: hyphae::CellMap<Arc<str>, Arc<dyn AnyItem>, CellImmutable>,
174 window: Option<QueryWindow>,
175 ) {
176 let had_existing = self.subscriptions.contains_key(&tx);
177 if had_existing {
178 log::trace!(
179 "ClientSession {} replacing existing query subscription tx={} (active_before={})",
180 self.client_id,
181 tx,
182 self.subscriptions.len()
183 );
184 }
185
186 let writer = self.writer.clone();
187 let tx_clone = tx.clone();
188 let tx_for_log = tx_clone.clone();
189 let state = Arc::new(Mutex::new(QuerySubscriptionState {
190 window,
191 ..Default::default()
192 }));
193 let state_for_diffs = state.clone();
194
195 let guard = cell.subscribe_diffs(move |diff| {
197 let response = match state_for_diffs.lock() {
198 Ok(mut state) => state.apply_source_diff(diff, tx_clone.clone()),
199 Err(_) => {
200 log::error!("Query subscription state poisoned for tx={}", tx_clone);
201 return;
202 }
203 };
204 if let Some(response) = response {
205 writer.send_query_response(response, false);
206 }
207 });
208
209 self.subscriptions.insert(
210 tx,
211 SubscriptionEntry::Query(QuerySubscription {
212 _guard: guard,
213 state,
214 kind: QuerySubscriptionKind::Query,
215 }),
216 );
217
218 let active = self.subscriptions.len();
219 log::trace!(
220 "ClientSession {} subscribed query tx={} active_subscriptions={}",
221 self.client_id,
222 tx_for_log,
223 active
224 );
225 if active >= 100 && active.is_multiple_of(100) {
226 log::trace!(
227 "ClientSession {} high subscription count: {} (most recent tx={})",
228 self.client_id,
229 active,
230 tx_for_log
231 );
232 }
233 }
234
235 pub fn subscribe_view(
237 &mut self,
238 tx: Arc<str>,
239 cell: hyphae::CellMap<Arc<str>, Arc<dyn AnyItem>, CellImmutable>,
240 window: Option<QueryWindow>,
241 ) {
242 self.subscribe_view_with_id(tx, "unknown".into(), cell, window);
243 }
244
245 pub fn subscribe_view_with_id(
247 &mut self,
248 tx: Arc<str>,
249 view_id: Arc<str>,
250 cell: hyphae::CellMap<Arc<str>, Arc<dyn AnyItem>, CellImmutable>,
251 window: Option<QueryWindow>,
252 ) {
253 let writer = self.writer.clone();
254 let tx_clone = tx.clone();
255 let tx_for_log = tx_clone.clone();
256 let client_id_for_log = self.client_id.clone();
257 let view_id_for_log = view_id.clone();
258 let subscribed_at = Instant::now();
259 let state = Arc::new(Mutex::new(QuerySubscriptionState {
260 window,
261 ..Default::default()
262 }));
263 let state_for_diffs = state.clone();
264
265 let guard = cell.subscribe_diffs(move |diff| {
266 let response = match state_for_diffs.lock() {
267 Ok(mut state) => state.apply_source_diff(diff, tx_clone.clone()),
268 Err(_) => {
269 log::error!("View subscription state poisoned for tx={}", tx_clone);
270 return;
271 }
272 };
273 let Some(response) = response else {
274 return;
275 };
276 log::trace!(
277 "ClientSession {} view tx={} seq={} upserts={} deletes={} changes={} window={:?} total_count={:?}",
278 client_id_for_log,
279 tx_clone,
280 response.sequence,
281 response.upsert_items.len(),
282 response.deletes.len(),
283 response.upsert_items.len()
284 + response.deletes.len()
285 + usize::from(response.window_order_ids.is_some()),
286 response.window,
287 response.total_count
288 );
289 if response.sequence == 0 {
290 let first_emit_ms = subscribed_at.elapsed().as_millis();
291 log::trace!(
292 target: "myko::server::view_perf",
293 "view_perf client={} view_id={} tx={} first_emit_ms={} initial_rows={} total_count={:?} window={:?}",
294 client_id_for_log,
295 view_id_for_log,
296 tx_clone,
297 first_emit_ms,
298 response.upsert_items.len(),
299 response.total_count,
300 response.window
301 );
302 }
303 writer.send_query_response(response, true);
304 });
305
306 self.subscriptions.insert(
307 tx,
308 SubscriptionEntry::Query(QuerySubscription {
309 _guard: guard,
310 state,
311 kind: QuerySubscriptionKind::View,
312 }),
313 );
314
315 log::trace!(
316 "ClientSession {} subscribed view view_id={} tx={} active_subscriptions={}",
317 self.client_id,
318 view_id,
319 tx_for_log,
320 self.subscriptions.len()
321 );
322 }
323
324 pub fn subscribe_report(
326 &mut self,
327 tx: Arc<str>,
328 report_id: Arc<str>,
329 cell: Cell<Arc<dyn AnyOutput>, CellImmutable>,
330 ) {
331 let had_existing = self.subscriptions.contains_key(&tx);
332 if had_existing {
333 log::trace!(
334 "ClientSession {} replacing existing report subscription tx={} report_id={} (active_before={})",
335 self.client_id,
336 tx,
337 report_id,
338 self.subscriptions.len()
339 );
340 }
341
342 let writer = self.writer.clone();
343 let tx_clone = tx.clone();
344 let tx_for_log = tx_clone.clone();
345 let report_id_for_log = report_id.clone();
346
347 let guard = cell.subscribe(move |signal| match &signal {
348 Signal::Value(output) => {
349 writer.send_report_response(tx_clone.clone(), Arc::clone(output.as_ref()));
350 }
351 Signal::Complete => {}
352 Signal::Error(e) => {
353 writer.send(MykoMessage::ReportError(ReportError {
354 tx: tx_clone.to_string(),
355 report_id: report_id.to_string(),
356 message: e.to_string(),
357 }));
358 }
359 });
360
361 self.subscriptions
362 .insert(tx, SubscriptionEntry::Guard { _guard: guard });
363
364 let active = self.subscriptions.len();
365 log::trace!(
366 "ClientSession {} subscribed report tx={} report_id={} active_subscriptions={}",
367 self.client_id,
368 tx_for_log,
369 report_id_for_log,
370 active
371 );
372 if active >= 100 && active.is_multiple_of(100) {
373 log::trace!(
374 "ClientSession {} high subscription count: {} (most recent report tx={}, id={})",
375 self.client_id,
376 active,
377 tx_for_log,
378 report_id_for_log
379 );
380 }
381 }
382
383 pub fn update_query_window(&mut self, tx: &Arc<str>, window: Option<QueryWindow>) {
385 let Some(SubscriptionEntry::Query(sub)) = self.subscriptions.get(tx) else {
386 log::trace!(
387 "ClientSession {} window update for unknown tx={} (active_subscriptions={})",
388 self.client_id,
389 tx,
390 self.subscriptions.len()
391 );
392 return;
393 };
394
395 let response = match sub.state.lock() {
396 Ok(mut state) => state.apply_window_update(window, tx.clone()),
397 Err(_) => {
398 log::error!(
399 "Query subscription state poisoned on window update for tx={}",
400 tx
401 );
402 return;
403 }
404 };
405
406 let Some(response) = response else {
407 log::trace!(
408 "ClientSession {} ignored no-op window update tx={} (active_subscriptions={})",
409 self.client_id,
410 tx,
411 self.subscriptions.len()
412 );
413 return;
414 };
415
416 match sub.kind {
417 QuerySubscriptionKind::Query => self.writer.send_query_response(response, false),
418 QuerySubscriptionKind::View => self.writer.send_query_response(response, true),
419 }
420 log::trace!(
421 "ClientSession {} updated query window tx={} (active_subscriptions={})",
422 self.client_id,
423 tx,
424 self.subscriptions.len()
425 );
426 }
427
428 pub fn update_view_window(&mut self, tx: &Arc<str>, window: Option<QueryWindow>) {
430 log::trace!(
431 "ClientSession {} requested view window update tx={} window={:?}",
432 self.client_id,
433 tx,
434 window
435 );
436 self.update_query_window(tx, window);
437 }
438
439 pub fn cancel(&mut self, tx: &Arc<str>) {
441 let removed = self.subscriptions.remove(tx).is_some();
442 log::trace!(
443 "ClientSession {} cancel tx={} removed={} active_subscriptions={}",
444 self.client_id,
445 tx,
446 removed,
447 self.subscriptions.len()
448 );
449 }
450
451 pub fn cancel_all(&mut self) {
453 let before = self.subscriptions.len();
454 self.subscriptions.clear();
455 log::trace!(
456 "ClientSession {} cancel_all removed_subscriptions={}",
457 self.client_id,
458 before
459 );
460 }
461
462 pub fn subscription_count(&self) -> usize {
464 self.subscriptions.len()
465 }
466
467 pub fn has_subscription(&self, tx: &Arc<str>) -> bool {
469 self.subscriptions.contains_key(tx)
470 }
471}
472
473impl QuerySubscriptionState {
474 fn apply_source_diff(
475 &mut self,
476 diff: &hyphae::MapDiff<Arc<str>, Arc<dyn AnyItem>>,
477 tx: Arc<str>,
478 ) -> Option<PendingQueryResponse> {
479 if self.window.is_none() {
480 return self.apply_source_diff_unwindowed(diff, tx);
481 }
482
483 let previous_total_count = self.all_items.len();
484 let mut changed_ids: HashSet<Arc<str>> = HashSet::new();
485 let mut removed_ids: HashSet<Arc<str>> = HashSet::new();
486 let mut is_initial = false;
487
488 match diff {
489 hyphae::MapDiff::Initial { entries } => {
490 is_initial = true;
491 self.all_items.clear();
492 for (id, item) in entries {
493 self.all_items.insert(id.clone(), item.clone());
494 changed_ids.insert(id.clone());
495 }
496 }
497 hyphae::MapDiff::Insert { key, value } => {
498 self.all_items.insert(key.clone(), value.clone());
499 changed_ids.insert(key.clone());
500 }
501 hyphae::MapDiff::Update { key, new_value, .. } => {
502 self.all_items.insert(key.clone(), new_value.clone());
503 changed_ids.insert(key.clone());
504 }
505 hyphae::MapDiff::Remove { key, .. } => {
506 self.all_items.remove(key);
507 removed_ids.insert(key.clone());
508 }
509 hyphae::MapDiff::Batch { changes } => {
510 let batch_size = changes.len();
511 for change in changes {
512 match change {
513 hyphae::MapDiff::Initial { entries } => {
514 is_initial = true;
515 self.all_items.clear();
516 for (id, item) in entries {
517 self.all_items.insert(id.clone(), item.clone());
518 changed_ids.insert(id.clone());
519 }
520 }
521 hyphae::MapDiff::Insert { key, value } => {
522 self.all_items.insert(key.clone(), value.clone());
523 changed_ids.insert(key.clone());
524 }
525 hyphae::MapDiff::Update { key, new_value, .. } => {
526 self.all_items.insert(key.clone(), new_value.clone());
527 changed_ids.insert(key.clone());
528 }
529 hyphae::MapDiff::Remove { key, .. } => {
530 self.all_items.remove(key);
531 removed_ids.insert(key.clone());
532 }
533 hyphae::MapDiff::Batch { .. } => {}
534 }
535 }
536 if batch_size >= 64 {
537 log::trace!(
538 "ClientSession tx={} apply_source_diff batch_size={} all_items={}",
539 tx,
540 batch_size,
541 self.all_items.len()
542 );
543 }
544 }
545 }
546
547 if is_initial {
550 self.sequence = 0;
551 }
552
553 self.compute_windowed_response(tx, &changed_ids, &removed_ids, previous_total_count, false)
554 }
555
556 fn apply_source_diff_unwindowed(
557 &mut self,
558 diff: &hyphae::MapDiff<Arc<str>, Arc<dyn AnyItem>>,
559 tx: Arc<str>,
560 ) -> Option<PendingQueryResponse> {
561 let previous_total_count = self.all_items.len();
562 let mut upsert_items: Vec<Arc<dyn AnyItem>> = Vec::new();
563 let mut deletes: Vec<Arc<str>> = Vec::new();
564
565 let mut is_initial = false;
571
572 match diff {
573 hyphae::MapDiff::Initial { entries } => {
574 is_initial = true;
575 self.all_items.clear();
576 for (id, item) in entries {
577 self.all_items.insert(id.clone(), item.clone());
578 upsert_items.push(item.clone());
579 }
580 }
581 hyphae::MapDiff::Insert { key, value } => {
582 self.all_items.insert(key.clone(), value.clone());
583 upsert_items.push(value.clone());
584 }
585 hyphae::MapDiff::Update { key, new_value, .. } => {
586 self.all_items.insert(key.clone(), new_value.clone());
587 upsert_items.push(new_value.clone());
588 }
589 hyphae::MapDiff::Remove { key, .. } => {
590 if self.all_items.remove(key).is_some() {
591 deletes.push(key.clone());
592 }
593 }
594 hyphae::MapDiff::Batch { changes } => {
595 for change in changes {
596 match change {
597 hyphae::MapDiff::Initial { entries } => {
598 is_initial = true;
599 self.all_items.clear();
600 for (id, item) in entries {
601 self.all_items.insert(id.clone(), item.clone());
602 upsert_items.push(item.clone());
603 }
604 }
605 hyphae::MapDiff::Insert { key, value } => {
606 self.all_items.insert(key.clone(), value.clone());
607 upsert_items.push(value.clone());
608 }
609 hyphae::MapDiff::Update { key, new_value, .. } => {
610 self.all_items.insert(key.clone(), new_value.clone());
611 upsert_items.push(new_value.clone());
612 }
613 hyphae::MapDiff::Remove { key, .. } => {
614 if self.all_items.remove(key).is_some() {
615 deletes.push(key.clone());
616 }
617 }
618 hyphae::MapDiff::Batch { .. } => {}
619 }
620 }
621 }
622 }
623
624 if is_initial {
625 self.sequence = 0;
626 }
627
628 let total_count = self.all_items.len();
629 let total_count_changed = previous_total_count != total_count;
630 let visible_changed = !upsert_items.is_empty() || !deletes.is_empty();
631 let should_emit = self.sequence == 0 || visible_changed || total_count_changed;
632
633 log::trace!(
634 "ClientSession tx={} window_decision force_emit=false seq={} changed_ids={} upserts={} deletes={} visible_changed={} window_order_changed=false total_count_changed={} should_emit={} total_count={} window=None",
635 tx,
636 self.sequence,
637 upsert_items.len(),
638 upsert_items.len(),
639 deletes.len(),
640 visible_changed,
641 total_count_changed,
642 should_emit,
643 total_count
644 );
645
646 if !should_emit {
647 return None;
648 }
649
650 let seq = self.sequence;
651 self.sequence = self.sequence.saturating_add(1);
652
653 Some(PendingQueryResponse {
654 tx,
655 sequence: seq,
656 upsert_items,
657 deletes,
658 total_count,
659 window: None,
660 window_order_ids: None,
661 })
662 }
663
664 fn apply_window_update(
665 &mut self,
666 window: Option<QueryWindow>,
667 tx: Arc<str>,
668 ) -> Option<PendingQueryResponse> {
669 let same_window = match (&self.window, &window) {
670 (None, None) => true,
671 (Some(current), Some(next)) => {
672 current.offset == next.offset && current.limit == next.limit
673 }
674 _ => false,
675 };
676 if same_window {
677 return None;
678 }
679
680 self.window = window;
681 self.compute_windowed_response(
682 tx,
683 &HashSet::new(),
684 &HashSet::new(),
685 self.all_items.len(),
686 false,
687 )
688 }
689
690 fn compute_windowed_response(
691 &mut self,
692 tx: Arc<str>,
693 changed_ids: &HashSet<Arc<str>>,
694 removed_ids: &HashSet<Arc<str>>,
695 previous_total_count: usize,
696 force_emit: bool,
697 ) -> Option<PendingQueryResponse> {
698 if self.window.is_none() {
699 if self.sequence == 0 {
700 self.visible_items = self.all_items.clone();
701 } else {
702 for id in removed_ids {
703 self.visible_items.remove(id);
704 }
705 for id in changed_ids {
706 if let Some(item) = self.all_items.get(id.as_ref()) {
707 self.visible_items.insert(id.clone(), item.clone());
708 }
709 }
710 }
711
712 let mut deletes: Vec<Arc<str>> = removed_ids
713 .iter()
714 .filter(|id| !self.all_items.contains_key(id.as_ref()))
715 .cloned()
716 .collect();
717 deletes.sort_unstable();
718
719 let mut upsert_items: Vec<Arc<dyn AnyItem>> = Vec::new();
720 if self.sequence == 0 {
721 let mut ids: Vec<Arc<str>> = self.all_items.keys().cloned().collect();
722 ids.sort_unstable();
723 for id in ids {
724 if let Some(item) = self.all_items.get(id.as_ref()) {
725 upsert_items.push(item.clone());
726 }
727 }
728 } else {
729 let mut ids: Vec<Arc<str>> = changed_ids.iter().cloned().collect();
730 ids.sort_unstable();
731 for id in ids {
732 if let Some(item) = self.all_items.get(id.as_ref()) {
733 upsert_items.push(item.clone());
734 }
735 }
736 }
737
738 let total_count = self.all_items.len();
739 let window_order_changed = false;
740 let total_count_changed = previous_total_count != total_count;
741 let visible_changed = !upsert_items.is_empty() || !deletes.is_empty();
742 let should_emit =
743 force_emit || self.sequence == 0 || visible_changed || total_count_changed;
744
745 log::trace!(
746 "ClientSession tx={} window_decision force_emit={} seq={} changed_ids={} upserts={} deletes={} visible_changed={} window_order_changed={} total_count_changed={} should_emit={} total_count={} window={:?}",
747 tx,
748 force_emit,
749 self.sequence,
750 changed_ids.len(),
751 upsert_items.len(),
752 deletes.len(),
753 visible_changed,
754 window_order_changed,
755 total_count_changed,
756 should_emit,
757 total_count,
758 self.window
759 );
760
761 if !should_emit {
762 return None;
763 }
764
765 let seq = self.sequence;
766 self.sequence = self.sequence.saturating_add(1);
767
768 return Some(PendingQueryResponse {
769 tx,
770 sequence: seq,
771 upsert_items,
772 deletes,
773 total_count,
774 window: None,
775 window_order_ids: None,
776 });
777 }
778
779 let mut ordered_ids: Vec<Arc<str>> = self.all_items.keys().cloned().collect();
780 ordered_ids.sort_unstable();
781
782 let visible_ids: Vec<Arc<str>> = if let Some(window) = &self.window {
783 if window.limit == 0 {
784 Vec::new()
785 } else {
786 let start = window.offset.min(ordered_ids.len());
787 let end = start.saturating_add(window.limit).min(ordered_ids.len());
788 ordered_ids[start..end].to_vec()
789 }
790 } else {
791 ordered_ids
792 };
793
794 let previous_visible = self.visible_items.clone();
795 let mut previous_visible_ids: Vec<Arc<str>> = previous_visible.keys().cloned().collect();
796 previous_visible_ids.sort_unstable();
797 let mut next_visible: HashMap<Arc<str>, Arc<dyn AnyItem>> = HashMap::new();
798
799 for id in &visible_ids {
800 if let Some(item) = self.all_items.get(id.as_ref()) {
801 next_visible.insert(id.clone(), item.clone());
802 }
803 }
804
805 let mut deletes: Vec<Arc<str>> = previous_visible
806 .keys()
807 .filter(|id| !next_visible.contains_key(*id))
808 .cloned()
809 .collect();
810 deletes.sort_unstable();
811
812 let mut upsert_items: Vec<Arc<dyn AnyItem>> = Vec::new();
813 for id in &visible_ids {
814 let is_new = !previous_visible.contains_key(id);
815 let is_changed = changed_ids.contains(id);
816 let should_emit = self.sequence == 0 || is_new || is_changed;
817
818 if should_emit && let Some(item) = next_visible.get(id) {
819 upsert_items.push(item.clone());
820 }
821 }
822
823 let total_count = self.all_items.len();
824 let window_order_changed = previous_visible_ids != visible_ids;
825 let total_count_changed = previous_total_count != total_count;
826 let visible_changed = !upsert_items.is_empty() || !deletes.is_empty();
827 let should_emit = force_emit
828 || self.sequence == 0
829 || visible_changed
830 || window_order_changed
831 || total_count_changed;
832
833 log::trace!(
834 "ClientSession tx={} window_decision force_emit={} seq={} changed_ids={} upserts={} deletes={} visible_changed={} window_order_changed={} total_count_changed={} should_emit={} total_count={} window={:?}",
835 tx,
836 force_emit,
837 self.sequence,
838 changed_ids.len(),
839 upsert_items.len(),
840 deletes.len(),
841 visible_changed,
842 window_order_changed,
843 total_count_changed,
844 should_emit,
845 total_count,
846 self.window
847 );
848
849 self.visible_items = next_visible;
850
851 if !should_emit {
852 return None;
853 }
854
855 let seq = self.sequence;
856 self.sequence = self.sequence.saturating_add(1);
857
858 Some(PendingQueryResponse {
859 tx,
860 sequence: seq,
861 upsert_items,
862 deletes,
863 total_count,
864 window: self.window.clone(),
865 window_order_ids: self.window.as_ref().map(|_| visible_ids),
866 })
867 }
868}
869
870impl<W: WsWriter> Drop for ClientSession<W> {
871 fn drop(&mut self) {
872 log::trace!(
874 "ClientSession dropped for client {}, cleaning up {} subscriptions",
875 self.client_id,
876 self.subscriptions.len()
877 );
878 }
879}
880
881#[cfg(test)]
882mod tests {
883 use std::sync::Mutex;
884
885 use hyphae::SelectExt;
886
887 use super::*;
888 use crate::{common::with_id::WithId, store::StoreRegistry};
889
890 struct MockWriter {
892 messages: Mutex<Vec<MykoMessage>>,
893 }
894
895 impl MockWriter {
896 fn new() -> Self {
897 Self {
898 messages: Mutex::new(Vec::new()),
899 }
900 }
901
902 fn message_count(&self) -> usize {
903 self.messages.lock().unwrap().len()
904 }
905
906 fn last_message(&self) -> Option<MykoMessage> {
907 self.messages.lock().unwrap().last().cloned()
908 }
909
910 fn messages(&self) -> Vec<MykoMessage> {
911 self.messages.lock().unwrap().clone()
912 }
913 }
914
915 impl WsWriter for MockWriter {
916 fn send(&self, msg: MykoMessage) {
917 self.messages.lock().unwrap().push(msg);
918 }
919
920 fn send_serialized_command(
921 &self,
922 _tx: Arc<str>,
923 _command_id: String,
924 payload: EncodedCommandMessage,
925 ) {
926 let msg = match payload {
927 EncodedCommandMessage::Json(json) => {
928 serde_json::from_str(&json).expect("Serialized command JSON should decode")
929 }
930 EncodedCommandMessage::Msgpack(bytes) => {
931 rmp_serde::from_slice(&bytes).expect("Serialized command msgpack should decode")
932 }
933 };
934 self.send(msg);
935 }
936 }
937
938 struct ArcMockWriter(Arc<MockWriter>);
940
941 impl WsWriter for ArcMockWriter {
942 fn send(&self, msg: MykoMessage) {
943 self.0.send(msg);
944 }
945
946 fn send_serialized_command(
947 &self,
948 tx: Arc<str>,
949 command_id: String,
950 payload: EncodedCommandMessage,
951 ) {
952 self.0.send_serialized_command(tx, command_id, payload);
953 }
954 }
955
956 #[derive(Debug, Clone, PartialEq, serde::Serialize)]
958 struct TestEntity {
959 id: Arc<str>,
960 name: String,
961 }
962
963 impl WithId for TestEntity {
964 fn id(&self) -> Arc<str> {
965 self.id.clone()
966 }
967 }
968
969 impl AnyItem for TestEntity {
970 fn as_any(&self) -> &dyn std::any::Any {
971 self
972 }
973
974 fn entity_type(&self) -> &'static str {
975 "TestEntity"
976 }
977
978 fn equals(&self, other: &dyn AnyItem) -> bool {
979 other
980 .as_any()
981 .downcast_ref::<Self>()
982 .map(|typed| self == typed)
983 .unwrap_or(false)
984 }
985 }
986
987 fn make_entity(id: &str, name: &str) -> Arc<dyn AnyItem> {
988 Arc::new(TestEntity {
989 id: id.into(),
990 name: name.to_string(),
991 }) as Arc<dyn AnyItem>
992 }
993
994 #[test]
995 fn test_subscribe_query_cellmap() {
996 let registry = Arc::new(StoreRegistry::new());
997 let store = registry.get_or_create("Entity");
998 store.insert("a".into(), make_entity("a", "Alice"));
999 store.insert("b".into(), make_entity("b", "Bob"));
1000
1001 let mock = Arc::new(MockWriter::new());
1002 let writer = ArcMockWriter(mock.clone());
1003 let mut session = ClientSession::new("client-1".into(), writer);
1004
1005 let cellmap = store.select(|_| true);
1006 session.subscribe_query("tx-1".into(), cellmap, None);
1007
1008 assert!(mock.message_count() >= 1);
1010
1011 store.insert("c".into(), make_entity("c", "Charlie"));
1013 assert!(mock.message_count() >= 2);
1014 }
1015
1016 #[test]
1017 fn test_cancel_subscription() {
1018 let registry = Arc::new(StoreRegistry::new());
1019 let store = registry.get_or_create("Entity");
1020 let mock = Arc::new(MockWriter::new());
1021 let writer = ArcMockWriter(mock.clone());
1022 let mut session = ClientSession::new("client-1".into(), writer);
1023
1024 let cellmap = store.select(|_| true);
1025 session.subscribe_query("tx-1".into(), cellmap, None);
1026 assert_eq!(session.subscription_count(), 1);
1027
1028 session.cancel(&"tx-1".into());
1029 assert_eq!(session.subscription_count(), 0);
1030 }
1031
1032 #[test]
1033 fn test_session_drop_cleanup() {
1034 let registry = Arc::new(StoreRegistry::new());
1035 let store = registry.get_or_create("Entity");
1036 store.insert("a".into(), make_entity("a", "Alice"));
1037
1038 {
1039 let mock = Arc::new(MockWriter::new());
1040 let writer = ArcMockWriter(mock.clone());
1041 let mut session = ClientSession::new("client-1".into(), writer);
1042
1043 let cellmap1 = store.select(|_| true);
1044 let cellmap2 = store.select(|_| true);
1045 session.subscribe_query("tx-1".into(), cellmap1, None);
1046 session.subscribe_query("tx-2".into(), cellmap2, None);
1047
1048 assert_eq!(session.subscription_count(), 2);
1050 }
1051 }
1053
1054 #[test]
1055 fn test_subscribe_by_id() {
1056 let registry = Arc::new(StoreRegistry::new());
1057 let store = registry.get_or_create("Entity");
1058 store.insert("a".into(), make_entity("a", "Alice"));
1059
1060 let mock = Arc::new(MockWriter::new());
1061 let writer = ArcMockWriter(mock.clone());
1062 let mut session = ClientSession::new("client-1".into(), writer);
1063
1064 let id: Arc<str> = "a".into();
1065 let cellmap = store.select(move |item| *item.id() == *id);
1066 session.subscribe_query("tx-1".into(), cellmap, None);
1067
1068 assert!(mock.message_count() >= 1);
1070
1071 store.insert("a".into(), make_entity("a", "Alice Updated"));
1073 assert!(mock.message_count() >= 2);
1074 }
1075
1076 #[test]
1077 fn test_delete_sends_deletes_not_upserts() {
1078 let registry = Arc::new(StoreRegistry::new());
1079 let store = registry.get_or_create("Entity");
1080 store.insert("a".into(), make_entity("a", "Alice"));
1081 store.insert("b".into(), make_entity("b", "Bob"));
1082
1083 let mock = Arc::new(MockWriter::new());
1084 let writer = ArcMockWriter(mock.clone());
1085 let mut session = ClientSession::new("client-1".into(), writer);
1086
1087 let cellmap = store.select(|_| true);
1088 session.subscribe_query("tx-1".into(), cellmap, None);
1089
1090 let initial_count = mock.message_count();
1091
1092 store.remove(&"a".into());
1094
1095 assert!(mock.message_count() > initial_count);
1097
1098 let last_msg = mock.last_message().unwrap();
1100 if let MykoMessage::QueryResponse(QueryResponse {
1101 deletes, upserts, ..
1102 }) = last_msg
1103 {
1104 assert!(
1106 deletes.iter().any(|id| id.as_ref() == "a"),
1107 "Delete should contain 'a'"
1108 );
1109 assert!(upserts.is_empty(), "Upserts should be empty for delete");
1110 } else {
1111 panic!("Expected QueryResponse");
1112 }
1113 }
1114
1115 #[test]
1116 fn test_subscribe_view_respects_initial_window() {
1117 let registry = Arc::new(StoreRegistry::new());
1118 let store = registry.get_or_create("Entity");
1119 store.insert("a".into(), make_entity("a", "Alice"));
1120 store.insert("b".into(), make_entity("b", "Bob"));
1121 store.insert("c".into(), make_entity("c", "Charlie"));
1122
1123 let mock = Arc::new(MockWriter::new());
1124 let writer = ArcMockWriter(mock.clone());
1125 let mut session = ClientSession::new("client-1".into(), writer);
1126
1127 let cellmap = store.select(|_| true);
1128 session.subscribe_view(
1129 "tx-view-1".into(),
1130 cellmap,
1131 Some(QueryWindow {
1132 offset: 0,
1133 limit: 1,
1134 }),
1135 );
1136
1137 let msgs = mock.messages();
1138 let first = msgs.into_iter().find_map(|m| match m {
1139 MykoMessage::ViewResponse(r) => Some(r),
1140 _ => None,
1141 });
1142 let Some(resp) = first else {
1143 panic!("expected at least one ViewResponse");
1144 };
1145
1146 assert_eq!(resp.upserts.len(), 1);
1147 assert_eq!(resp.deletes.len(), 0);
1148 assert_eq!(resp.total_count, Some(3));
1149 let Some(window) = resp.window else {
1150 panic!("expected window in response");
1151 };
1152 assert_eq!(window.offset, 0);
1153 assert_eq!(window.limit, 1);
1154 }
1155
1156 #[test]
1157 fn test_view_window_ignores_out_of_window_updates() {
1158 let registry = Arc::new(StoreRegistry::new());
1159 let store = registry.get_or_create("Entity");
1160 store.insert("a".into(), make_entity("a", "Alice"));
1161 store.insert("b".into(), make_entity("b", "Bob"));
1162 store.insert("c".into(), make_entity("c", "Charlie"));
1163
1164 let mock = Arc::new(MockWriter::new());
1165 let writer = ArcMockWriter(mock.clone());
1166 let mut session = ClientSession::new("client-1".into(), writer);
1167
1168 let cellmap = store.select(|_| true);
1169 session.subscribe_view(
1170 "tx-view-1".into(),
1171 cellmap,
1172 Some(QueryWindow {
1173 offset: 0,
1174 limit: 1,
1175 }),
1176 );
1177
1178 let before = mock.message_count();
1180 assert!(before >= 1);
1181
1182 store.insert("c".into(), make_entity("c", "Charlie Updated"));
1184
1185 let after = mock.message_count();
1187 assert_eq!(after, before);
1188 }
1189}