Skip to main content

myko/server/
client_session.rs

1//! Client session management for WebSocket connections
2//!
3//! Each WebSocket connection gets a ClientSession that manages:
4//! - Active subscriptions via SubscriptionGuards
5//! - Message sending to the client
6//! - Automatic cleanup on disconnect
7
8use std::{
9    collections::{HashMap, HashSet},
10    sync::{Arc, Mutex},
11    time::Instant,
12};
13
14use hyphae::{Cell, CellImmutable, Signal, SubscriptionGuard, Watchable};
15
16use crate::{
17    client::MykoProtocol,
18    core::item::AnyItem,
19    report::AnyOutput,
20    wire::{
21        EncodedCommandMessage, ErasedWrappedItem, MykoMessage, QueryChange, QueryResponse,
22        QueryWindow, ReportError, ReportResponse,
23    },
24};
25
26/// Trait for sending WebSocket messages.
27///
28/// Implemented by the actual WebSocket writer to allow abstraction
29/// and easier testing.
30pub trait WsWriter: Send + Sync + 'static {
31    /// Send a message to the client.
32    fn send(&self, msg: MykoMessage);
33
34    /// Return the writer's preferred wire protocol for outbound messages.
35    fn protocol(&self) -> MykoProtocol {
36        MykoProtocol::JSON
37    }
38
39    /// Send a pre-serialized command payload while preserving command metadata.
40    fn send_serialized_command(
41        &self,
42        tx: Arc<str>,
43        command_id: String,
44        payload: EncodedCommandMessage,
45    );
46
47    /// Send a report response while allowing implementations to defer
48    /// expensive serialization/conversion work off the reactive callback path.
49    fn send_report_response(&self, tx: Arc<str>, output: Arc<dyn AnyOutput>) {
50        self.send(MykoMessage::ReportResponse(ReportResponse {
51            response: output.to_value(),
52            tx: tx.to_string(),
53        }));
54    }
55
56    /// Send a query/view response while allowing implementations to defer
57    /// expensive item-to-JSON conversion off the reactive callback path.
58    fn send_query_response(&self, response: PendingQueryResponse, is_view: bool) {
59        let wire = response.into_wire();
60        if is_view {
61            self.send(MykoMessage::ViewResponse(wire));
62        } else {
63            self.send(MykoMessage::QueryResponse(wire));
64        }
65    }
66}
67
68#[derive(Clone)]
69pub struct PendingQueryResponse {
70    pub tx: Arc<str>,
71    pub sequence: u64,
72    pub upsert_items: Vec<Arc<dyn AnyItem>>,
73    pub deletes: Vec<Arc<str>>,
74    pub total_count: usize,
75    pub window: Option<QueryWindow>,
76    pub window_order_ids: Option<Vec<Arc<str>>>,
77}
78
79impl PendingQueryResponse {
80    pub fn into_wire(self) -> QueryResponse {
81        let upserts: Vec<ErasedWrappedItem> = self
82            .upsert_items
83            .iter()
84            .map(|item| ErasedWrappedItem {
85                item: item.clone(),
86                item_type: item.entity_type().into(),
87            })
88            .collect();
89
90        let mut changes: Vec<QueryChange> = Vec::with_capacity(
91            upserts.len() + self.deletes.len() + usize::from(self.window_order_ids.is_some()),
92        );
93        for item in &upserts {
94            changes.push(QueryChange::Upsert { item: item.clone() });
95        }
96        for id in &self.deletes {
97            changes.push(QueryChange::Delete { id: id.clone() });
98        }
99        if let Some(ids) = self.window_order_ids {
100            changes.push(QueryChange::WindowOrder {
101                ids,
102                total_count: self.total_count,
103                window: self.window.clone(),
104            });
105        }
106
107        QueryResponse {
108            tx: self.tx,
109            sequence: self.sequence,
110            changes,
111            upserts,
112            deletes: self.deletes,
113            total_count: Some(self.total_count),
114            window: self.window,
115        }
116    }
117}
118
119/// A WebSocket client session that manages subscriptions.
120///
121/// When dropped, all subscription guards are dropped, automatically
122/// cleaning up all reactive subscriptions.
123pub struct ClientSession<W: WsWriter> {
124    /// Unique client identifier
125    pub client_id: Arc<str>,
126    /// WebSocket writer for sending messages
127    writer: Arc<W>,
128    /// Active subscriptions: tx -> entry
129    subscriptions: HashMap<Arc<str>, SubscriptionEntry>,
130}
131
132enum SubscriptionEntry {
133    Query(QuerySubscription),
134    Guard { _guard: SubscriptionGuard },
135}
136
137struct QuerySubscription {
138    _guard: SubscriptionGuard,
139    state: Arc<Mutex<QuerySubscriptionState>>,
140    kind: QuerySubscriptionKind,
141}
142
143#[derive(Clone, Copy)]
144enum QuerySubscriptionKind {
145    Query,
146    View,
147}
148
149#[derive(Default)]
150struct QuerySubscriptionState {
151    sequence: u64,
152    window: Option<QueryWindow>,
153    all_items: HashMap<Arc<str>, Arc<dyn AnyItem>>,
154    visible_items: HashMap<Arc<str>, Arc<dyn AnyItem>>,
155}
156
157impl<W: WsWriter> ClientSession<W> {
158    /// Create a new client session.
159    pub fn new(client_id: Arc<str>, writer: W) -> Self {
160        Self {
161            client_id,
162            writer: Arc::new(writer),
163            subscriptions: HashMap::new(),
164        }
165    }
166
167    /// Subscribe to a CellMap from a query cell factory.
168    ///
169    /// This is used by WsHandler when the query registration provides a cell factory.
170    pub fn subscribe_query(
171        &mut self,
172        tx: Arc<str>,
173        cell: hyphae::CellMap<Arc<str>, Arc<dyn AnyItem>, CellImmutable>,
174        window: Option<QueryWindow>,
175    ) {
176        let had_existing = self.subscriptions.contains_key(&tx);
177        if had_existing {
178            log::trace!(
179                "ClientSession {} replacing existing query subscription tx={} (active_before={})",
180                self.client_id,
181                tx,
182                self.subscriptions.len()
183            );
184        }
185
186        let writer = self.writer.clone();
187        let tx_clone = tx.clone();
188        let tx_for_log = tx_clone.clone();
189        let state = Arc::new(Mutex::new(QuerySubscriptionState {
190            window,
191            ..Default::default()
192        }));
193        let state_for_diffs = state.clone();
194
195        // subscribe_diffs sends Initial first, then subsequent diffs
196        let guard = cell.subscribe_diffs(move |diff| {
197            let response = match state_for_diffs.lock() {
198                Ok(mut state) => state.apply_source_diff(diff, tx_clone.clone()),
199                Err(_) => {
200                    log::error!("Query subscription state poisoned for tx={}", tx_clone);
201                    return;
202                }
203            };
204            if let Some(response) = response {
205                writer.send_query_response(response, false);
206            }
207        });
208
209        self.subscriptions.insert(
210            tx,
211            SubscriptionEntry::Query(QuerySubscription {
212                _guard: guard,
213                state,
214                kind: QuerySubscriptionKind::Query,
215            }),
216        );
217
218        let active = self.subscriptions.len();
219        log::trace!(
220            "ClientSession {} subscribed query tx={} active_subscriptions={}",
221            self.client_id,
222            tx_for_log,
223            active
224        );
225        if active >= 100 && active.is_multiple_of(100) {
226            log::trace!(
227                "ClientSession {} high subscription count: {} (most recent tx={})",
228                self.client_id,
229                active,
230                tx_for_log
231            );
232        }
233    }
234
235    /// Subscribe to a CellMap from a view cell factory.
236    pub fn subscribe_view(
237        &mut self,
238        tx: Arc<str>,
239        cell: hyphae::CellMap<Arc<str>, Arc<dyn AnyItem>, CellImmutable>,
240        window: Option<QueryWindow>,
241    ) {
242        self.subscribe_view_with_id(tx, "unknown".into(), cell, window);
243    }
244
245    /// Subscribe to a CellMap from a view cell factory with explicit view id for perf logging.
246    pub fn subscribe_view_with_id(
247        &mut self,
248        tx: Arc<str>,
249        view_id: Arc<str>,
250        cell: hyphae::CellMap<Arc<str>, Arc<dyn AnyItem>, CellImmutable>,
251        window: Option<QueryWindow>,
252    ) {
253        let writer = self.writer.clone();
254        let tx_clone = tx.clone();
255        let tx_for_log = tx_clone.clone();
256        let client_id_for_log = self.client_id.clone();
257        let view_id_for_log = view_id.clone();
258        let subscribed_at = Instant::now();
259        let state = Arc::new(Mutex::new(QuerySubscriptionState {
260            window,
261            ..Default::default()
262        }));
263        let state_for_diffs = state.clone();
264
265        let guard = cell.subscribe_diffs(move |diff| {
266            let response = match state_for_diffs.lock() {
267                Ok(mut state) => state.apply_source_diff(diff, tx_clone.clone()),
268                Err(_) => {
269                    log::error!("View subscription state poisoned for tx={}", tx_clone);
270                    return;
271                }
272            };
273            let Some(response) = response else {
274                return;
275            };
276            log::trace!(
277                "ClientSession {} view tx={} seq={} upserts={} deletes={} changes={} window={:?} total_count={:?}",
278                client_id_for_log,
279                tx_clone,
280                response.sequence,
281                response.upsert_items.len(),
282                response.deletes.len(),
283                response.upsert_items.len()
284                    + response.deletes.len()
285                    + usize::from(response.window_order_ids.is_some()),
286                response.window,
287                response.total_count
288            );
289            if response.sequence == 0 {
290                let first_emit_ms = subscribed_at.elapsed().as_millis();
291                log::trace!(
292                    target: "myko::server::view_perf",
293                    "view_perf client={} view_id={} tx={} first_emit_ms={} initial_rows={} total_count={:?} window={:?}",
294                    client_id_for_log,
295                    view_id_for_log,
296                    tx_clone,
297                    first_emit_ms,
298                    response.upsert_items.len(),
299                    response.total_count,
300                    response.window
301                );
302            }
303            writer.send_query_response(response, true);
304        });
305
306        self.subscriptions.insert(
307            tx,
308            SubscriptionEntry::Query(QuerySubscription {
309                _guard: guard,
310                state,
311                kind: QuerySubscriptionKind::View,
312            }),
313        );
314
315        log::trace!(
316            "ClientSession {} subscribed view view_id={} tx={} active_subscriptions={}",
317            self.client_id,
318            view_id,
319            tx_for_log,
320            self.subscriptions.len()
321        );
322    }
323
324    /// Subscribe to a report cell.
325    pub fn subscribe_report(
326        &mut self,
327        tx: Arc<str>,
328        report_id: Arc<str>,
329        cell: Cell<Arc<dyn AnyOutput>, CellImmutable>,
330    ) {
331        let had_existing = self.subscriptions.contains_key(&tx);
332        if had_existing {
333            log::trace!(
334                "ClientSession {} replacing existing report subscription tx={} report_id={} (active_before={})",
335                self.client_id,
336                tx,
337                report_id,
338                self.subscriptions.len()
339            );
340        }
341
342        let writer = self.writer.clone();
343        let tx_clone = tx.clone();
344        let tx_for_log = tx_clone.clone();
345        let report_id_for_log = report_id.clone();
346
347        let guard = cell.subscribe(move |signal| match &signal {
348            Signal::Value(output) => {
349                writer.send_report_response(tx_clone.clone(), Arc::clone(output.as_ref()));
350            }
351            Signal::Complete => {}
352            Signal::Error(e) => {
353                writer.send(MykoMessage::ReportError(ReportError {
354                    tx: tx_clone.to_string(),
355                    report_id: report_id.to_string(),
356                    message: e.to_string(),
357                }));
358            }
359        });
360
361        self.subscriptions
362            .insert(tx, SubscriptionEntry::Guard { _guard: guard });
363
364        let active = self.subscriptions.len();
365        log::trace!(
366            "ClientSession {} subscribed report tx={} report_id={} active_subscriptions={}",
367            self.client_id,
368            tx_for_log,
369            report_id_for_log,
370            active
371        );
372        if active >= 100 && active.is_multiple_of(100) {
373            log::trace!(
374                "ClientSession {} high subscription count: {} (most recent report tx={}, id={})",
375                self.client_id,
376                active,
377                tx_for_log,
378                report_id_for_log
379            );
380        }
381    }
382
383    /// Update window for an active query subscription.
384    pub fn update_query_window(&mut self, tx: &Arc<str>, window: Option<QueryWindow>) {
385        let Some(SubscriptionEntry::Query(sub)) = self.subscriptions.get(tx) else {
386            log::trace!(
387                "ClientSession {} window update for unknown tx={} (active_subscriptions={})",
388                self.client_id,
389                tx,
390                self.subscriptions.len()
391            );
392            return;
393        };
394
395        let response = match sub.state.lock() {
396            Ok(mut state) => state.apply_window_update(window, tx.clone()),
397            Err(_) => {
398                log::error!(
399                    "Query subscription state poisoned on window update for tx={}",
400                    tx
401                );
402                return;
403            }
404        };
405
406        let Some(response) = response else {
407            log::trace!(
408                "ClientSession {} ignored no-op window update tx={} (active_subscriptions={})",
409                self.client_id,
410                tx,
411                self.subscriptions.len()
412            );
413            return;
414        };
415
416        match sub.kind {
417            QuerySubscriptionKind::Query => self.writer.send_query_response(response, false),
418            QuerySubscriptionKind::View => self.writer.send_query_response(response, true),
419        }
420        log::trace!(
421            "ClientSession {} updated query window tx={} (active_subscriptions={})",
422            self.client_id,
423            tx,
424            self.subscriptions.len()
425        );
426    }
427
428    /// Update window for an active view subscription.
429    pub fn update_view_window(&mut self, tx: &Arc<str>, window: Option<QueryWindow>) {
430        log::trace!(
431            "ClientSession {} requested view window update tx={} window={:?}",
432            self.client_id,
433            tx,
434            window
435        );
436        self.update_query_window(tx, window);
437    }
438
439    /// Cancel a subscription by transaction ID.
440    pub fn cancel(&mut self, tx: &Arc<str>) {
441        let removed = self.subscriptions.remove(tx).is_some();
442        log::trace!(
443            "ClientSession {} cancel tx={} removed={} active_subscriptions={}",
444            self.client_id,
445            tx,
446            removed,
447            self.subscriptions.len()
448        );
449    }
450
451    /// Cancel all subscriptions.
452    pub fn cancel_all(&mut self) {
453        let before = self.subscriptions.len();
454        self.subscriptions.clear();
455        log::trace!(
456            "ClientSession {} cancel_all removed_subscriptions={}",
457            self.client_id,
458            before
459        );
460    }
461
462    /// Get the number of active subscriptions.
463    pub fn subscription_count(&self) -> usize {
464        self.subscriptions.len()
465    }
466
467    /// Check if a subscription exists.
468    pub fn has_subscription(&self, tx: &Arc<str>) -> bool {
469        self.subscriptions.contains_key(tx)
470    }
471}
472
473impl QuerySubscriptionState {
474    fn apply_source_diff(
475        &mut self,
476        diff: &hyphae::MapDiff<Arc<str>, Arc<dyn AnyItem>>,
477        tx: Arc<str>,
478    ) -> Option<PendingQueryResponse> {
479        if self.window.is_none() {
480            return self.apply_source_diff_unwindowed(diff, tx);
481        }
482
483        let previous_total_count = self.all_items.len();
484        let mut changed_ids: HashSet<Arc<str>> = HashSet::new();
485        let mut removed_ids: HashSet<Arc<str>> = HashSet::new();
486        let mut is_initial = false;
487
488        match diff {
489            hyphae::MapDiff::Initial { entries } => {
490                is_initial = true;
491                self.all_items.clear();
492                for (id, item) in entries {
493                    self.all_items.insert(id.clone(), item.clone());
494                    changed_ids.insert(id.clone());
495                }
496            }
497            hyphae::MapDiff::Insert { key, value } => {
498                self.all_items.insert(key.clone(), value.clone());
499                changed_ids.insert(key.clone());
500            }
501            hyphae::MapDiff::Update { key, new_value, .. } => {
502                self.all_items.insert(key.clone(), new_value.clone());
503                changed_ids.insert(key.clone());
504            }
505            hyphae::MapDiff::Remove { key, .. } => {
506                self.all_items.remove(key);
507                removed_ids.insert(key.clone());
508            }
509            hyphae::MapDiff::Batch { changes } => {
510                let batch_size = changes.len();
511                for change in changes {
512                    match change {
513                        hyphae::MapDiff::Initial { entries } => {
514                            is_initial = true;
515                            self.all_items.clear();
516                            for (id, item) in entries {
517                                self.all_items.insert(id.clone(), item.clone());
518                                changed_ids.insert(id.clone());
519                            }
520                        }
521                        hyphae::MapDiff::Insert { key, value } => {
522                            self.all_items.insert(key.clone(), value.clone());
523                            changed_ids.insert(key.clone());
524                        }
525                        hyphae::MapDiff::Update { key, new_value, .. } => {
526                            self.all_items.insert(key.clone(), new_value.clone());
527                            changed_ids.insert(key.clone());
528                        }
529                        hyphae::MapDiff::Remove { key, .. } => {
530                            self.all_items.remove(key);
531                            removed_ids.insert(key.clone());
532                        }
533                        hyphae::MapDiff::Batch { .. } => {}
534                    }
535                }
536                if batch_size >= 64 {
537                    log::trace!(
538                        "ClientSession tx={} apply_source_diff batch_size={} all_items={}",
539                        tx,
540                        batch_size,
541                        self.all_items.len()
542                    );
543                }
544            }
545        }
546
547        // NOTE(ts): MapDiff::Initial = full state replacement — reset sequence
548        // so the client performs replace_all instead of incremental update.
549        if is_initial {
550            self.sequence = 0;
551        }
552
553        self.compute_windowed_response(tx, &changed_ids, &removed_ids, previous_total_count, false)
554    }
555
556    fn apply_source_diff_unwindowed(
557        &mut self,
558        diff: &hyphae::MapDiff<Arc<str>, Arc<dyn AnyItem>>,
559        tx: Arc<str>,
560    ) -> Option<PendingQueryResponse> {
561        let previous_total_count = self.all_items.len();
562        let mut upsert_items: Vec<Arc<dyn AnyItem>> = Vec::new();
563        let mut deletes: Vec<Arc<str>> = Vec::new();
564
565        // NOTE(ts): MapDiff::Initial means "here is the complete new state" —
566        // reset sequence to 0 so the client performs a full replace_all instead
567        // of an incremental update. Without this, a full-clear Initial (empty
568        // entries) sends an empty diff that the client ignores, leaving stale
569        // items in the UI.
570        let mut is_initial = false;
571
572        match diff {
573            hyphae::MapDiff::Initial { entries } => {
574                is_initial = true;
575                self.all_items.clear();
576                for (id, item) in entries {
577                    self.all_items.insert(id.clone(), item.clone());
578                    upsert_items.push(item.clone());
579                }
580            }
581            hyphae::MapDiff::Insert { key, value } => {
582                self.all_items.insert(key.clone(), value.clone());
583                upsert_items.push(value.clone());
584            }
585            hyphae::MapDiff::Update { key, new_value, .. } => {
586                self.all_items.insert(key.clone(), new_value.clone());
587                upsert_items.push(new_value.clone());
588            }
589            hyphae::MapDiff::Remove { key, .. } => {
590                if self.all_items.remove(key).is_some() {
591                    deletes.push(key.clone());
592                }
593            }
594            hyphae::MapDiff::Batch { changes } => {
595                for change in changes {
596                    match change {
597                        hyphae::MapDiff::Initial { entries } => {
598                            is_initial = true;
599                            self.all_items.clear();
600                            for (id, item) in entries {
601                                self.all_items.insert(id.clone(), item.clone());
602                                upsert_items.push(item.clone());
603                            }
604                        }
605                        hyphae::MapDiff::Insert { key, value } => {
606                            self.all_items.insert(key.clone(), value.clone());
607                            upsert_items.push(value.clone());
608                        }
609                        hyphae::MapDiff::Update { key, new_value, .. } => {
610                            self.all_items.insert(key.clone(), new_value.clone());
611                            upsert_items.push(new_value.clone());
612                        }
613                        hyphae::MapDiff::Remove { key, .. } => {
614                            if self.all_items.remove(key).is_some() {
615                                deletes.push(key.clone());
616                            }
617                        }
618                        hyphae::MapDiff::Batch { .. } => {}
619                    }
620                }
621            }
622        }
623
624        if is_initial {
625            self.sequence = 0;
626        }
627
628        let total_count = self.all_items.len();
629        let total_count_changed = previous_total_count != total_count;
630        let visible_changed = !upsert_items.is_empty() || !deletes.is_empty();
631        let should_emit = self.sequence == 0 || visible_changed || total_count_changed;
632
633        log::trace!(
634            "ClientSession tx={} window_decision force_emit=false seq={} changed_ids={} upserts={} deletes={} visible_changed={} window_order_changed=false total_count_changed={} should_emit={} total_count={} window=None",
635            tx,
636            self.sequence,
637            upsert_items.len(),
638            upsert_items.len(),
639            deletes.len(),
640            visible_changed,
641            total_count_changed,
642            should_emit,
643            total_count
644        );
645
646        if !should_emit {
647            return None;
648        }
649
650        let seq = self.sequence;
651        self.sequence = self.sequence.saturating_add(1);
652
653        Some(PendingQueryResponse {
654            tx,
655            sequence: seq,
656            upsert_items,
657            deletes,
658            total_count,
659            window: None,
660            window_order_ids: None,
661        })
662    }
663
664    fn apply_window_update(
665        &mut self,
666        window: Option<QueryWindow>,
667        tx: Arc<str>,
668    ) -> Option<PendingQueryResponse> {
669        let same_window = match (&self.window, &window) {
670            (None, None) => true,
671            (Some(current), Some(next)) => {
672                current.offset == next.offset && current.limit == next.limit
673            }
674            _ => false,
675        };
676        if same_window {
677            return None;
678        }
679
680        self.window = window;
681        self.compute_windowed_response(
682            tx,
683            &HashSet::new(),
684            &HashSet::new(),
685            self.all_items.len(),
686            false,
687        )
688    }
689
690    fn compute_windowed_response(
691        &mut self,
692        tx: Arc<str>,
693        changed_ids: &HashSet<Arc<str>>,
694        removed_ids: &HashSet<Arc<str>>,
695        previous_total_count: usize,
696        force_emit: bool,
697    ) -> Option<PendingQueryResponse> {
698        if self.window.is_none() {
699            if self.sequence == 0 {
700                self.visible_items = self.all_items.clone();
701            } else {
702                for id in removed_ids {
703                    self.visible_items.remove(id);
704                }
705                for id in changed_ids {
706                    if let Some(item) = self.all_items.get(id.as_ref()) {
707                        self.visible_items.insert(id.clone(), item.clone());
708                    }
709                }
710            }
711
712            let mut deletes: Vec<Arc<str>> = removed_ids
713                .iter()
714                .filter(|id| !self.all_items.contains_key(id.as_ref()))
715                .cloned()
716                .collect();
717            deletes.sort_unstable();
718
719            let mut upsert_items: Vec<Arc<dyn AnyItem>> = Vec::new();
720            if self.sequence == 0 {
721                let mut ids: Vec<Arc<str>> = self.all_items.keys().cloned().collect();
722                ids.sort_unstable();
723                for id in ids {
724                    if let Some(item) = self.all_items.get(id.as_ref()) {
725                        upsert_items.push(item.clone());
726                    }
727                }
728            } else {
729                let mut ids: Vec<Arc<str>> = changed_ids.iter().cloned().collect();
730                ids.sort_unstable();
731                for id in ids {
732                    if let Some(item) = self.all_items.get(id.as_ref()) {
733                        upsert_items.push(item.clone());
734                    }
735                }
736            }
737
738            let total_count = self.all_items.len();
739            let window_order_changed = false;
740            let total_count_changed = previous_total_count != total_count;
741            let visible_changed = !upsert_items.is_empty() || !deletes.is_empty();
742            let should_emit =
743                force_emit || self.sequence == 0 || visible_changed || total_count_changed;
744
745            log::trace!(
746                "ClientSession tx={} window_decision force_emit={} seq={} changed_ids={} upserts={} deletes={} visible_changed={} window_order_changed={} total_count_changed={} should_emit={} total_count={} window={:?}",
747                tx,
748                force_emit,
749                self.sequence,
750                changed_ids.len(),
751                upsert_items.len(),
752                deletes.len(),
753                visible_changed,
754                window_order_changed,
755                total_count_changed,
756                should_emit,
757                total_count,
758                self.window
759            );
760
761            if !should_emit {
762                return None;
763            }
764
765            let seq = self.sequence;
766            self.sequence = self.sequence.saturating_add(1);
767
768            return Some(PendingQueryResponse {
769                tx,
770                sequence: seq,
771                upsert_items,
772                deletes,
773                total_count,
774                window: None,
775                window_order_ids: None,
776            });
777        }
778
779        let mut ordered_ids: Vec<Arc<str>> = self.all_items.keys().cloned().collect();
780        ordered_ids.sort_unstable();
781
782        let visible_ids: Vec<Arc<str>> = if let Some(window) = &self.window {
783            if window.limit == 0 {
784                Vec::new()
785            } else {
786                let start = window.offset.min(ordered_ids.len());
787                let end = start.saturating_add(window.limit).min(ordered_ids.len());
788                ordered_ids[start..end].to_vec()
789            }
790        } else {
791            ordered_ids
792        };
793
794        let previous_visible = self.visible_items.clone();
795        let mut previous_visible_ids: Vec<Arc<str>> = previous_visible.keys().cloned().collect();
796        previous_visible_ids.sort_unstable();
797        let mut next_visible: HashMap<Arc<str>, Arc<dyn AnyItem>> = HashMap::new();
798
799        for id in &visible_ids {
800            if let Some(item) = self.all_items.get(id.as_ref()) {
801                next_visible.insert(id.clone(), item.clone());
802            }
803        }
804
805        let mut deletes: Vec<Arc<str>> = previous_visible
806            .keys()
807            .filter(|id| !next_visible.contains_key(*id))
808            .cloned()
809            .collect();
810        deletes.sort_unstable();
811
812        let mut upsert_items: Vec<Arc<dyn AnyItem>> = Vec::new();
813        for id in &visible_ids {
814            let is_new = !previous_visible.contains_key(id);
815            let is_changed = changed_ids.contains(id);
816            let should_emit = self.sequence == 0 || is_new || is_changed;
817
818            if should_emit && let Some(item) = next_visible.get(id) {
819                upsert_items.push(item.clone());
820            }
821        }
822
823        let total_count = self.all_items.len();
824        let window_order_changed = previous_visible_ids != visible_ids;
825        let total_count_changed = previous_total_count != total_count;
826        let visible_changed = !upsert_items.is_empty() || !deletes.is_empty();
827        let should_emit = force_emit
828            || self.sequence == 0
829            || visible_changed
830            || window_order_changed
831            || total_count_changed;
832
833        log::trace!(
834            "ClientSession tx={} window_decision force_emit={} seq={} changed_ids={} upserts={} deletes={} visible_changed={} window_order_changed={} total_count_changed={} should_emit={} total_count={} window={:?}",
835            tx,
836            force_emit,
837            self.sequence,
838            changed_ids.len(),
839            upsert_items.len(),
840            deletes.len(),
841            visible_changed,
842            window_order_changed,
843            total_count_changed,
844            should_emit,
845            total_count,
846            self.window
847        );
848
849        self.visible_items = next_visible;
850
851        if !should_emit {
852            return None;
853        }
854
855        let seq = self.sequence;
856        self.sequence = self.sequence.saturating_add(1);
857
858        Some(PendingQueryResponse {
859            tx,
860            sequence: seq,
861            upsert_items,
862            deletes,
863            total_count,
864            window: self.window.clone(),
865            window_order_ids: self.window.as_ref().map(|_| visible_ids),
866        })
867    }
868}
869
870impl<W: WsWriter> Drop for ClientSession<W> {
871    fn drop(&mut self) {
872        // All guards drop automatically
873        log::trace!(
874            "ClientSession dropped for client {}, cleaning up {} subscriptions",
875            self.client_id,
876            self.subscriptions.len()
877        );
878    }
879}
880
881#[cfg(test)]
882mod tests {
883    use std::sync::Mutex;
884
885    use hyphae::SelectExt;
886
887    use super::*;
888    use crate::{common::with_id::WithId, store::StoreRegistry};
889
890    // Mock writer that collects messages
891    struct MockWriter {
892        messages: Mutex<Vec<MykoMessage>>,
893    }
894
895    impl MockWriter {
896        fn new() -> Self {
897            Self {
898                messages: Mutex::new(Vec::new()),
899            }
900        }
901
902        fn message_count(&self) -> usize {
903            self.messages.lock().unwrap().len()
904        }
905
906        fn last_message(&self) -> Option<MykoMessage> {
907            self.messages.lock().unwrap().last().cloned()
908        }
909
910        fn messages(&self) -> Vec<MykoMessage> {
911            self.messages.lock().unwrap().clone()
912        }
913    }
914
915    impl WsWriter for MockWriter {
916        fn send(&self, msg: MykoMessage) {
917            self.messages.lock().unwrap().push(msg);
918        }
919
920        fn send_serialized_command(
921            &self,
922            _tx: Arc<str>,
923            _command_id: String,
924            payload: EncodedCommandMessage,
925        ) {
926            let msg = match payload {
927                EncodedCommandMessage::Json(json) => {
928                    serde_json::from_str(&json).expect("Serialized command JSON should decode")
929                }
930                EncodedCommandMessage::Msgpack(bytes) => {
931                    rmp_serde::from_slice(&bytes).expect("Serialized command msgpack should decode")
932                }
933            };
934            self.send(msg);
935        }
936    }
937
938    // Need Arc wrapper for test
939    struct ArcMockWriter(Arc<MockWriter>);
940
941    impl WsWriter for ArcMockWriter {
942        fn send(&self, msg: MykoMessage) {
943            self.0.send(msg);
944        }
945
946        fn send_serialized_command(
947            &self,
948            tx: Arc<str>,
949            command_id: String,
950            payload: EncodedCommandMessage,
951        ) {
952            self.0.send_serialized_command(tx, command_id, payload);
953        }
954    }
955
956    // Test entity
957    #[derive(Debug, Clone, PartialEq, serde::Serialize)]
958    struct TestEntity {
959        id: Arc<str>,
960        name: String,
961    }
962
963    impl WithId for TestEntity {
964        fn id(&self) -> Arc<str> {
965            self.id.clone()
966        }
967    }
968
969    impl AnyItem for TestEntity {
970        fn as_any(&self) -> &dyn std::any::Any {
971            self
972        }
973
974        fn entity_type(&self) -> &'static str {
975            "TestEntity"
976        }
977
978        fn equals(&self, other: &dyn AnyItem) -> bool {
979            other
980                .as_any()
981                .downcast_ref::<Self>()
982                .map(|typed| self == typed)
983                .unwrap_or(false)
984        }
985    }
986
987    fn make_entity(id: &str, name: &str) -> Arc<dyn AnyItem> {
988        Arc::new(TestEntity {
989            id: id.into(),
990            name: name.to_string(),
991        }) as Arc<dyn AnyItem>
992    }
993
994    #[test]
995    fn test_subscribe_query_cellmap() {
996        let registry = Arc::new(StoreRegistry::new());
997        let store = registry.get_or_create("Entity");
998        store.insert("a".into(), make_entity("a", "Alice"));
999        store.insert("b".into(), make_entity("b", "Bob"));
1000
1001        let mock = Arc::new(MockWriter::new());
1002        let writer = ArcMockWriter(mock.clone());
1003        let mut session = ClientSession::new("client-1".into(), writer);
1004
1005        let cellmap = store.select(|_| true);
1006        session.subscribe_query("tx-1".into(), cellmap, None);
1007
1008        // Should have received initial data
1009        assert!(mock.message_count() >= 1);
1010
1011        // Add an entity
1012        store.insert("c".into(), make_entity("c", "Charlie"));
1013        assert!(mock.message_count() >= 2);
1014    }
1015
1016    #[test]
1017    fn test_cancel_subscription() {
1018        let registry = Arc::new(StoreRegistry::new());
1019        let store = registry.get_or_create("Entity");
1020        let mock = Arc::new(MockWriter::new());
1021        let writer = ArcMockWriter(mock.clone());
1022        let mut session = ClientSession::new("client-1".into(), writer);
1023
1024        let cellmap = store.select(|_| true);
1025        session.subscribe_query("tx-1".into(), cellmap, None);
1026        assert_eq!(session.subscription_count(), 1);
1027
1028        session.cancel(&"tx-1".into());
1029        assert_eq!(session.subscription_count(), 0);
1030    }
1031
1032    #[test]
1033    fn test_session_drop_cleanup() {
1034        let registry = Arc::new(StoreRegistry::new());
1035        let store = registry.get_or_create("Entity");
1036        store.insert("a".into(), make_entity("a", "Alice"));
1037
1038        {
1039            let mock = Arc::new(MockWriter::new());
1040            let writer = ArcMockWriter(mock.clone());
1041            let mut session = ClientSession::new("client-1".into(), writer);
1042
1043            let cellmap1 = store.select(|_| true);
1044            let cellmap2 = store.select(|_| true);
1045            session.subscribe_query("tx-1".into(), cellmap1, None);
1046            session.subscribe_query("tx-2".into(), cellmap2, None);
1047
1048            // 2 subscriptions active
1049            assert_eq!(session.subscription_count(), 2);
1050        }
1051        // Session dropped - subscriptions should be cleaned up
1052    }
1053
1054    #[test]
1055    fn test_subscribe_by_id() {
1056        let registry = Arc::new(StoreRegistry::new());
1057        let store = registry.get_or_create("Entity");
1058        store.insert("a".into(), make_entity("a", "Alice"));
1059
1060        let mock = Arc::new(MockWriter::new());
1061        let writer = ArcMockWriter(mock.clone());
1062        let mut session = ClientSession::new("client-1".into(), writer);
1063
1064        let id: Arc<str> = "a".into();
1065        let cellmap = store.select(move |item| *item.id() == *id);
1066        session.subscribe_query("tx-1".into(), cellmap, None);
1067
1068        // Should have received initial data
1069        assert!(mock.message_count() >= 1);
1070
1071        // Update the entity
1072        store.insert("a".into(), make_entity("a", "Alice Updated"));
1073        assert!(mock.message_count() >= 2);
1074    }
1075
1076    #[test]
1077    fn test_delete_sends_deletes_not_upserts() {
1078        let registry = Arc::new(StoreRegistry::new());
1079        let store = registry.get_or_create("Entity");
1080        store.insert("a".into(), make_entity("a", "Alice"));
1081        store.insert("b".into(), make_entity("b", "Bob"));
1082
1083        let mock = Arc::new(MockWriter::new());
1084        let writer = ArcMockWriter(mock.clone());
1085        let mut session = ClientSession::new("client-1".into(), writer);
1086
1087        let cellmap = store.select(|_| true);
1088        session.subscribe_query("tx-1".into(), cellmap, None);
1089
1090        let initial_count = mock.message_count();
1091
1092        // Delete an entity
1093        store.remove(&"a".into());
1094
1095        // Should have received a message with deletes
1096        assert!(mock.message_count() > initial_count);
1097
1098        // Find the delete message (it should be the last one)
1099        let last_msg = mock.last_message().unwrap();
1100        if let MykoMessage::QueryResponse(QueryResponse {
1101            deletes, upserts, ..
1102        }) = last_msg
1103        {
1104            // The delete message should have "a" in deletes and empty upserts
1105            assert!(
1106                deletes.iter().any(|id| id.as_ref() == "a"),
1107                "Delete should contain 'a'"
1108            );
1109            assert!(upserts.is_empty(), "Upserts should be empty for delete");
1110        } else {
1111            panic!("Expected QueryResponse");
1112        }
1113    }
1114
1115    #[test]
1116    fn test_subscribe_view_respects_initial_window() {
1117        let registry = Arc::new(StoreRegistry::new());
1118        let store = registry.get_or_create("Entity");
1119        store.insert("a".into(), make_entity("a", "Alice"));
1120        store.insert("b".into(), make_entity("b", "Bob"));
1121        store.insert("c".into(), make_entity("c", "Charlie"));
1122
1123        let mock = Arc::new(MockWriter::new());
1124        let writer = ArcMockWriter(mock.clone());
1125        let mut session = ClientSession::new("client-1".into(), writer);
1126
1127        let cellmap = store.select(|_| true);
1128        session.subscribe_view(
1129            "tx-view-1".into(),
1130            cellmap,
1131            Some(QueryWindow {
1132                offset: 0,
1133                limit: 1,
1134            }),
1135        );
1136
1137        let msgs = mock.messages();
1138        let first = msgs.into_iter().find_map(|m| match m {
1139            MykoMessage::ViewResponse(r) => Some(r),
1140            _ => None,
1141        });
1142        let Some(resp) = first else {
1143            panic!("expected at least one ViewResponse");
1144        };
1145
1146        assert_eq!(resp.upserts.len(), 1);
1147        assert_eq!(resp.deletes.len(), 0);
1148        assert_eq!(resp.total_count, Some(3));
1149        let Some(window) = resp.window else {
1150            panic!("expected window in response");
1151        };
1152        assert_eq!(window.offset, 0);
1153        assert_eq!(window.limit, 1);
1154    }
1155
1156    #[test]
1157    fn test_view_window_ignores_out_of_window_updates() {
1158        let registry = Arc::new(StoreRegistry::new());
1159        let store = registry.get_or_create("Entity");
1160        store.insert("a".into(), make_entity("a", "Alice"));
1161        store.insert("b".into(), make_entity("b", "Bob"));
1162        store.insert("c".into(), make_entity("c", "Charlie"));
1163
1164        let mock = Arc::new(MockWriter::new());
1165        let writer = ArcMockWriter(mock.clone());
1166        let mut session = ClientSession::new("client-1".into(), writer);
1167
1168        let cellmap = store.select(|_| true);
1169        session.subscribe_view(
1170            "tx-view-1".into(),
1171            cellmap,
1172            Some(QueryWindow {
1173                offset: 0,
1174                limit: 1,
1175            }),
1176        );
1177
1178        // Initial window response
1179        let before = mock.message_count();
1180        assert!(before >= 1);
1181
1182        // "c" is outside window [a] with sorted IDs.
1183        store.insert("c".into(), make_entity("c", "Charlie Updated"));
1184
1185        // No visible/window/count change => no extra response.
1186        let after = mock.message_count();
1187        assert_eq!(after, before);
1188    }
1189}