1pub mod windowing;
4
5use ankql::ast::{
6 ComparisonOperator, Expr, Literal, OrderByItem, OrderDirection, PathExpr, Predicate, Selection,
7};
8use ankurah::changes::ChangeSet;
9use ankurah::core::selection::filter::Filterable;
10use ankurah::core::value::Value;
11use ankurah::{model::View, Context, LiveQuery};
12use ankurah_proto::EntityId;
13use ankurah_signals::{Mut, Peek, Read, Subscribe};
14
15pub use ankql::ast::{OrderByItem as OrderBy, Predicate as Filter};
17pub use ankurah_proto::EntityId as Id;
18pub use ankurah_signals;
19
20#[derive(Clone, Debug)]
26pub struct VisibleSet<V> {
27 pub items: Vec<V>,
29 pub intersection: Option<Intersection>,
31 pub has_more_preceding: bool,
33 pub has_more_following: bool,
35 pub should_auto_scroll: bool,
37 pub error: Option<String>,
39}
40
41impl<V> Default for VisibleSet<V> {
42 fn default() -> Self {
43 Self {
44 items: Vec::new(),
45 intersection: None,
46 has_more_preceding: true,
47 has_more_following: false,
48 should_auto_scroll: true,
49 error: None,
50 }
51 }
52}
53
54#[derive(Clone, Debug)]
56pub struct Intersection {
57 pub entity_id: EntityId,
58 pub index: usize,
59 pub direction: LoadDirection,
60}
61
62#[derive(Clone, Copy, Debug, PartialEq, Eq)]
70pub enum LoadDirection {
71 Backward,
73 Forward,
75}
76
77#[derive(Clone, Debug)]
79struct PendingSlide {
80 #[allow(dead_code)]
82 continuation: EntityId,
83 anchor: EntityId,
85 limit: usize,
87 direction: LoadDirection,
89 reversed_order: bool,
91}
92
93#[derive(Clone, Copy, Debug, PartialEq, Eq)]
95pub enum ScrollMode {
96 Live, Backward, Forward, }
100
101#[derive(Clone, Debug, Default)]
103pub struct ScrollDebugInfo {
104 pub items_above: usize,
106 pub items_below: usize,
108 pub trigger_threshold: usize,
110 pub first_visible_index: usize,
112 pub last_visible_index: usize,
114 pub update_count: u32,
116 pub update_pending: bool,
118}
119
120fn value_to_literal(value: &Value) -> Literal {
126 match value {
127 Value::I16(v) => Literal::I16(*v),
128 Value::I32(v) => Literal::I32(*v),
129 Value::I64(v) => Literal::I64(*v),
130 Value::F64(v) => Literal::F64(*v),
131 Value::Bool(v) => Literal::Bool(*v),
132 Value::String(v) => Literal::String(v.clone()),
133 _ => Literal::String(format!("{:?}", value)),
135 }
136}
137
138pub struct ScrollManager<V: View + Clone + Send + Sync + 'static> {
144 livequery: LiveQuery<V>,
145 predicate: Predicate,
146 display_order: Vec<OrderByItem>,
147 visible_set: Mut<VisibleSet<V>>,
148 mode: Mut<ScrollMode>,
149 pending: Mut<Option<PendingSlide>>,
151 last_trigger_oldest_visible: Mut<Option<EntityId>>,
153 debug_info: Mut<ScrollDebugInfo>,
155 update_count: std::sync::atomic::AtomicU32,
157 minimum_row_height: u32,
158 buffer_factor: f64,
159 viewport_height: u32,
160 _subscription: ankurah_signals::SubscriptionGuard,
161}
162
163impl<V: View + Clone + Send + Sync + 'static> ScrollManager<V> {
164 pub fn new(
174 ctx: &Context,
175 predicate: impl TryInto<Predicate, Error = impl std::fmt::Debug>,
176 display_order: impl IntoOrderBy,
177 minimum_row_height: u32,
178 buffer_factor: f64,
179 viewport_height: u32,
180 ) -> Result<Self, ankurah::error::RetrievalError> {
181 let predicate = predicate.try_into().expect("Failed to parse predicate");
182 let display_order = display_order
183 .into_order_by()
184 .expect("Failed to parse order");
185 let buffer_factor = buffer_factor.max(2.0);
186
187 let screen_items = windowing::screen_items(viewport_height, minimum_row_height);
189 let threshold = buffer_factor / 2.0;
190 let limit = windowing::live_window_size(screen_items, threshold);
191
192 let selection = Selection {
194 predicate: predicate.clone(),
195 order_by: Some(display_order.clone()),
196 limit: Some(limit as u64),
197 };
198 let livequery: LiveQuery<V> = ctx.query(selection)?;
199
200 let visible_set: Mut<VisibleSet<V>> = Mut::new(VisibleSet::default());
202 let pending: Mut<Option<PendingSlide>> = Mut::new(None);
203 let last_trigger_oldest_visible: Mut<Option<EntityId>> = Mut::new(None);
204 let mode: Mut<ScrollMode> = Mut::new(ScrollMode::Live);
205 let debug_info: Mut<ScrollDebugInfo> = Mut::new(ScrollDebugInfo {
206 trigger_threshold: screen_items,
207 ..Default::default()
208 });
209
210 let is_desc = display_order
212 .first()
213 .map(|o| o.direction == OrderDirection::Desc)
214 .unwrap_or(false);
215
216 let visible_set_clone = visible_set.clone();
218 let pending_clone = pending.clone();
219 let mode_clone = mode.clone();
220 let subscription = livequery.subscribe(move |changeset: ChangeSet<V>| {
221 tracing::trace!("[subscription] CALLBACK FIRED");
222
223 let current = visible_set_clone.peek();
224 if current.items.is_empty() && !changeset.resultset.peek().is_empty() {
226 tracing::debug!("[subscription] skipping - not yet initialized");
227 return;
228 }
229 let mut items: Vec<V> = changeset.resultset.peek();
230 tracing::trace!("[subscription] processing {} items, current has {}", items.len(), current.items.len());
231
232 let pending_slide = pending_clone.peek();
239 let should_process_slide = pending_slide.is_some() && changeset.resultset.is_loaded();
240 let slide = if should_process_slide {
241 pending_clone.set(None);
242 pending_slide
243 } else {
244 None
245 };
246
247 let used_reversed_order = slide.as_ref().map(|s| s.reversed_order).unwrap_or(false);
250 if is_desc && !used_reversed_order {
251 items.reverse();
252 }
253
254 let (has_more_preceding, has_more_following, intersection, error) = if let Some(ref slide) = slide {
256 let (has_more_preceding, has_more_following) = match slide.direction {
258 LoadDirection::Backward => {
259 let more_older = if items.len() > slide.limit {
260 items.remove(0); true
262 } else {
263 false
264 };
265 (more_older, true) }
267 LoadDirection::Forward => {
268 let more_newer = if items.len() > slide.limit {
269 items.pop(); true
271 } else {
272 mode_clone.set(ScrollMode::Live);
274 false
275 };
276 let more_older = current.has_more_preceding ||
278 current.items.first().map(|old| items.first().map(|new|
279 old.entity().id() != new.entity().id()
280 ).unwrap_or(false)).unwrap_or(false);
281 (more_older, more_newer)
282 }
283 };
284
285 tracing::trace!(
287 "[subscription] Looking for anchor {:?} in {} items",
288 slide.anchor, items.len()
289 );
290 let (intersection, error) = match items.iter().position(|item| item.entity().id() == slide.anchor) {
291 Some(index) => {
292 let anchor_ts = items.get(index).and_then(|i| i.entity().value("timestamp"));
293 tracing::trace!(
294 "[subscription] INTERSECTION: anchor {:?} (ts={:?}) found at index {}",
295 slide.anchor, anchor_ts, index
296 );
297 (
298 Some(Intersection {
299 entity_id: slide.anchor,
300 index,
301 direction: slide.direction,
302 }),
303 None
304 )
305 },
306 None => {
307 if slide.direction == LoadDirection::Forward {
308 tracing::trace!("[subscription] Forward slide: no overlap, jumping to live");
309 (None, None)
310 } else {
311 tracing::error!(
312 "[subscription] INTERSECTION FAILED: anchor {:?} not found in {} items",
313 slide.anchor, items.len()
314 );
315 (None, Some(format!(
316 "Intersection failed: anchor {} not found in result",
317 slide.anchor
318 )))
319 }
320 }
321 };
322
323 (has_more_preceding, has_more_following, intersection, error)
324 } else {
325 (current.has_more_preceding, current.has_more_following, None, None)
326 };
327
328 tracing::trace!(
329 "[subscription] visible_set: items={}, has_more_preceding={}, has_more_following={}",
330 items.len(), has_more_preceding, has_more_following
331 );
332
333 visible_set_clone.set(VisibleSet {
334 items,
335 intersection,
336 has_more_preceding,
337 has_more_following,
338 should_auto_scroll: mode_clone.peek() == ScrollMode::Live,
339 error,
340 });
341 });
342
343 Ok(Self {
344 livequery,
345 predicate,
346 display_order,
347 visible_set,
348 mode,
349 pending,
350 last_trigger_oldest_visible,
351 debug_info,
352 update_count: std::sync::atomic::AtomicU32::new(0),
353 minimum_row_height,
354 buffer_factor,
355 viewport_height,
356 _subscription: subscription,
357 })
358 }
359
360 pub async fn start(&self) {
363 self.livequery.wait_initialized().await;
364
365 let mut items: Vec<V> = self.livequery.peek();
366
367 let is_desc = self
368 .display_order
369 .first()
370 .map(|o| o.direction == OrderDirection::Desc)
371 .unwrap_or(false);
372 if is_desc {
373 items.reverse();
374 }
375
376 let live_window = self.live_window_size();
377 let has_more_preceding = items.len() >= live_window;
378
379 tracing::debug!(
380 "[start] initial visible_set: items={}, has_more_preceding={}",
381 items.len(), has_more_preceding
382 );
383
384 self.visible_set.set(VisibleSet {
385 items,
386 intersection: None,
387 has_more_preceding,
388 has_more_following: false,
389 should_auto_scroll: true,
390 error: None,
391 });
392 }
393
394 fn threshold(&self) -> f64 {
396 self.buffer_factor / 2.0
397 }
398
399 fn screen_items(&self) -> usize {
400 windowing::screen_items(self.viewport_height, self.minimum_row_height)
401 }
402
403 fn live_window_size(&self) -> usize {
404 windowing::live_window_size(self.screen_items(), self.threshold())
405 }
406
407 pub fn visible_set(&self) -> Read<VisibleSet<V>> {
409 self.visible_set.read()
410 }
411
412 pub fn mode(&self) -> ScrollMode {
413 self.mode.peek()
414 }
415
416 pub fn current_selection(&self) -> String {
418 let (selection, _version) = self.livequery.selection().peek();
419 format!("{}", selection)
420 }
421
422 pub fn debug_info(&self) -> Read<ScrollDebugInfo> {
424 self.debug_info.read()
425 }
426
427 pub fn on_scroll(&self, first_visible: EntityId, last_visible: EntityId, scrolling_backward: bool) {
434
435 let current = self.visible_set.peek();
436 let screen = self.screen_items();
437
438 tracing::trace!(
439 "[on_scroll] window: items={}, has_more_preceding={}, has_more_following={}",
440 current.items.len(), current.has_more_preceding, current.has_more_following
441 );
442
443 let first_idx = current.items.iter().position(|item| item.entity().id() == first_visible);
445 let last_idx = current.items.iter().position(|item| item.entity().id() == last_visible);
446
447 let (first_visible_index, last_visible_index) = match (first_idx, last_idx) {
448 (Some(f), Some(l)) => (f, l),
449 _ => {
450 tracing::warn!(
451 "[on_scroll] EARLY RETURN: EntityId not found! first_idx={:?}, last_idx={:?}",
452 first_idx, last_idx
453 );
454 return;
455 }
456 };
457
458 let items_above = first_visible_index;
459 let items_below = current.items.len().saturating_sub(last_visible_index + 1);
460
461 self.debug_info.set(ScrollDebugInfo {
463 items_above,
464 items_below,
465 trigger_threshold: screen,
466 first_visible_index,
467 last_visible_index,
468 update_count: self.update_count.load(std::sync::atomic::Ordering::Relaxed),
469 update_pending: self.pending.peek().is_some(),
470 });
471
472 tracing::trace!(
473 "[on_scroll] indices: first={}, last={}, above={}, below={}",
474 first_visible_index, last_visible_index, items_above, items_below
475 );
476
477 if self.mode.peek() == ScrollMode::Live && items_below > 0 {
480 tracing::debug!("[on_scroll] Exiting Live mode (item scrolled off bottom, items_below={})", items_below);
481 self.mode.set(ScrollMode::Backward);
482 let mut updated = current.clone();
484 updated.should_auto_scroll = false;
485 self.visible_set.set(updated);
486 }
487
488 let at_bottom = !current.has_more_following && items_below == 0;
491 if self.mode.peek() != ScrollMode::Live && at_bottom {
492 tracing::debug!("[on_scroll] Re-entering Live mode (scrolled to bottom)");
493 self.mode.set(ScrollMode::Live);
494 let mut updated = current.clone();
496 updated.should_auto_scroll = true;
497 self.visible_set.set(updated);
498 }
499
500 let backward_threshold = scrolling_backward && items_above <= screen && current.has_more_preceding;
502 let forward_threshold = !scrolling_backward && items_below <= screen && current.has_more_following;
503
504 if backward_threshold {
506 tracing::debug!("[on_scroll] TRIGGERING BACKWARD PAGINATION");
507 self.mode.set(ScrollMode::Backward);
508 self.slide_window(¤t, first_visible_index, last_visible_index, LoadDirection::Backward);
509 } else if forward_threshold {
510 tracing::debug!("[on_scroll] TRIGGERING FORWARD PAGINATION");
511 self.mode.set(ScrollMode::Forward);
512 self.slide_window(¤t, first_visible_index, last_visible_index, LoadDirection::Forward);
513 }
514 }
515
516 fn slide_window(
521 &self,
522 current: &VisibleSet<V>,
523 oldest_visible_index: usize,
524 newest_visible_index: usize,
525 direction: LoadDirection,
526 ) {
527 let buffer = 2 * self.screen_items(); let max_index = current.items.len().saturating_sub(1);
529
530 let (cursor_index, intersection_index, operator, reversed_order) = match direction {
533 LoadDirection::Backward => (
534 (newest_visible_index + buffer).min(max_index),
537 newest_visible_index, ComparisonOperator::LessThanOrEqual,
539 false,
540 ),
541 LoadDirection::Forward => (
542 oldest_visible_index.saturating_sub(buffer),
545 oldest_visible_index,
546 ComparisonOperator::GreaterThanOrEqual,
547 true, ),
549 };
550
551 let visible_span = newest_visible_index.saturating_sub(oldest_visible_index) + 1;
553 let limit = visible_span + 2 * buffer;
554
555 let continuation = current.items.get(cursor_index)
557 .map(|item| item.entity().id())
558 .expect("cursor item must exist");
559 let anchor = current.items.get(intersection_index)
560 .map(|item| item.entity().id())
561 .expect("anchor item must exist");
562
563 let threshold = self.screen_items(); let oldest_visible_entity = current.items.get(oldest_visible_index)
568 .map(|item| item.entity().id());
569
570 tracing::trace!(
571 "[slide_window] DEBOUNCE CHECK: oldest_visible_idx={}, oldest_visible_entity={:?}, array_len={}",
572 oldest_visible_index, oldest_visible_entity, current.items.len()
573 );
574
575 if let Some(last_oldest) = self.last_trigger_oldest_visible.peek() {
576 tracing::trace!(
577 "[slide_window] last_trigger_oldest_visible={:?}",
578 last_oldest
579 );
580
581 let last_idx = current.items.iter()
583 .position(|item| item.entity().id() == last_oldest);
584
585 tracing::trace!(
586 "[slide_window] last_oldest found at index: {:?}",
587 last_idx
588 );
589
590 if let Some(l_idx) = last_idx {
591 let distance = if oldest_visible_index < l_idx {
594 l_idx - oldest_visible_index
595 } else {
596 oldest_visible_index - l_idx
597 };
598 tracing::trace!(
599 "[slide_window] distance={}, threshold={} (l_idx={}, oldest_visible_idx={})",
600 distance, threshold, l_idx, oldest_visible_index
601 );
602 if distance < threshold {
603 tracing::trace!(
604 "[slide_window] DEBOUNCE: scrolled {} items < threshold {}, SKIPPING",
605 distance, threshold
606 );
607 return;
608 }
609 tracing::trace!(
610 "[slide_window] ALLOWING: scrolled {} items >= threshold {}",
611 distance, threshold
612 );
613 } else {
614 tracing::trace!(
616 "[slide_window] ALLOWING: last oldest_visible {:?} NOT FOUND in array of {} items (window shifted)",
617 last_oldest, current.items.len()
618 );
619 }
620 } else {
621 tracing::trace!(
622 "[slide_window] ALLOWING: no last_trigger_oldest_visible (first trigger)"
623 );
624 }
625
626 if let Some(entity) = oldest_visible_entity {
628 tracing::trace!(
629 "[slide_window] Setting last_trigger_oldest_visible = {:?}",
630 entity
631 );
632 self.last_trigger_oldest_visible.set(Some(entity));
633 }
634
635 self.update_count.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
637
638 self.pending.set(Some(PendingSlide {
639 continuation,
640 anchor,
641 limit,
642 direction,
643 reversed_order,
644 }));
645
646 let predicate = self.build_cursor_predicate(current, cursor_index, operator);
648
649 let order_by = if reversed_order {
651 self.display_order.iter().map(|item| OrderByItem {
652 direction: match item.direction {
653 OrderDirection::Asc => OrderDirection::Desc,
654 OrderDirection::Desc => OrderDirection::Asc,
655 },
656 ..item.clone()
657 }).collect()
658 } else {
659 self.display_order.clone()
660 };
661
662 let selection = Selection {
663 predicate: predicate.clone(),
664 order_by: Some(order_by),
665 limit: Some((limit + 1) as u64), };
667
668 let first_ts = current.items.first().and_then(|i| i.entity().value("timestamp"));
670 let last_ts = current.items.last().and_then(|i| i.entity().value("timestamp"));
671 tracing::trace!(
672 "[slide_window] cursor_index={}, oldest_vis={}, newest_vis={}, max={}, limit={}, first_ts={:?}, last_ts={:?}",
673 cursor_index, oldest_visible_index, newest_visible_index, max_index, limit, first_ts, last_ts
674 );
675 tracing::debug!("[slide_window] update_selection: {}", selection);
676
677 if let Err(e) = self.livequery.update_selection(selection) {
678 tracing::error!("[slide_window] FAILED to update selection: {}", e);
679 }
680 }
681
682 fn build_cursor_predicate(
684 &self,
685 current: &VisibleSet<V>,
686 cursor_index: usize,
687 operator: ComparisonOperator,
688 ) -> Predicate {
689 let Some(cursor_item) = current.items.get(cursor_index) else {
690 return self.predicate.clone();
691 };
692 let Some(order_item) = self.display_order.first() else {
693 return self.predicate.clone();
694 };
695 let field_name = order_item.path.first();
696 let Some(cursor_value) = cursor_item.entity().value(field_name) else {
697 return self.predicate.clone();
698 };
699
700 tracing::trace!(
702 "[build_cursor_predicate] cursor_index={}, entity_id={}, field={}, value={:?}",
703 cursor_index,
704 cursor_item.entity().id(),
705 field_name,
706 cursor_value
707 );
708
709 let cursor_predicate = Predicate::Comparison {
710 left: Box::new(Expr::Path(PathExpr::simple(field_name))),
711 operator,
712 right: Box::new(Expr::Literal(value_to_literal(&cursor_value))),
713 };
714
715 Predicate::And(
716 Box::new(self.predicate.clone()),
717 Box::new(cursor_predicate),
718 )
719 }
720}
721
722pub fn parse_order_by(s: &str) -> Result<Vec<OrderByItem>, String> {
727 use ankql::parser::parse_selection;
728 let selection_str = format!("true ORDER BY {}", s);
729 let selection =
730 parse_selection(&selection_str).map_err(|e| format!("Failed to parse ORDER BY: {}", e))?;
731 selection
732 .order_by
733 .ok_or_else(|| "No ORDER BY parsed".to_string())
734}
735
736pub trait IntoOrderBy {
737 fn into_order_by(self) -> Result<Vec<OrderByItem>, String>;
738}
739
740impl IntoOrderBy for &str {
741 fn into_order_by(self) -> Result<Vec<OrderByItem>, String> {
742 parse_order_by(self)
743 }
744}
745
746impl IntoOrderBy for Vec<OrderByItem> {
747 fn into_order_by(self) -> Result<Vec<OrderByItem>, String> {
748 Ok(self)
749 }
750}
751
752pub use ankurah_virtual_scroll_derive::generate_scroll_manager;