1pub mod windowing;
4
5use ankql::ast::{
6 ComparisonOperator, Expr, Literal, OrderByItem, OrderDirection, PathExpr, Predicate, Selection,
7};
8use ankurah::changes::ChangeSet;
9use ankurah::core::selection::filter::Filterable;
10use ankurah::core::value::Value;
11use ankurah::{model::View, Context, LiveQuery};
12use ankurah_proto::EntityId;
13use ankurah_signals::{Mut, Peek, Read, Subscribe};
14
15pub use ankql::ast::{OrderByItem as OrderBy, Predicate as Filter};
17pub use ankurah_proto::EntityId as Id;
18pub use ankurah_signals;
19
20#[derive(Clone, Debug)]
26pub struct VisibleSet<V> {
27 pub items: Vec<V>,
29 pub intersection: Option<Intersection>,
31 pub has_more_preceding: bool,
33 pub has_more_following: bool,
35 pub should_auto_scroll: bool,
37 pub error: Option<String>,
39}
40
41impl<V> Default for VisibleSet<V> {
42 fn default() -> Self {
43 Self {
44 items: Vec::new(),
45 intersection: None,
46 has_more_preceding: true,
47 has_more_following: false,
48 should_auto_scroll: true,
49 error: None,
50 }
51 }
52}
53
54#[derive(Clone, Debug)]
56pub struct Intersection {
57 pub entity_id: EntityId,
58 pub index: usize,
59 pub direction: LoadDirection,
60}
61
62#[derive(Clone, Copy, Debug, PartialEq, Eq)]
70pub enum LoadDirection {
71 Backward,
73 Forward,
75}
76
77#[derive(Clone, Debug)]
79struct PendingSlide {
80 #[allow(dead_code)]
82 continuation: EntityId,
83 anchor: EntityId,
85 limit: usize,
87 direction: LoadDirection,
89 reversed_order: bool,
91}
92
93#[derive(Clone, Copy, Debug, PartialEq, Eq)]
95pub enum ScrollMode {
96 Live, Backward, Forward, }
100
101#[derive(Clone, Debug, Default)]
103pub struct ScrollDebugInfo {
104 pub items_above: usize,
106 pub items_below: usize,
108 pub trigger_threshold: usize,
110 pub first_visible_index: usize,
112 pub last_visible_index: usize,
114 pub update_count: u32,
116 pub update_pending: bool,
118}
119
120fn value_to_literal(value: &Value) -> Literal {
126 match value {
127 Value::I16(v) => Literal::I16(*v),
128 Value::I32(v) => Literal::I32(*v),
129 Value::I64(v) => Literal::I64(*v),
130 Value::F64(v) => Literal::F64(*v),
131 Value::Bool(v) => Literal::Bool(*v),
132 Value::String(v) => Literal::String(v.clone()),
133 _ => Literal::String(format!("{:?}", value)),
135 }
136}
137
138pub struct ScrollManager<V: View + Clone + Send + Sync + 'static> {
144 livequery: LiveQuery<V>,
145 predicate: Predicate,
146 display_order: Vec<OrderByItem>,
147 visible_set: Mut<VisibleSet<V>>,
148 mode: Mut<ScrollMode>,
149 initialized: Mut<bool>,
151 pending: Mut<Option<PendingSlide>>,
153 last_trigger_oldest_visible: Mut<Option<EntityId>>,
155 debug_info: Mut<ScrollDebugInfo>,
157 update_count: std::sync::atomic::AtomicU32,
159 minimum_row_height: u32,
160 buffer_factor: f64,
161 viewport_height: u32,
162 _subscription: ankurah_signals::SubscriptionGuard,
163}
164
165impl<V: View + Clone + Send + Sync + 'static> ScrollManager<V> {
166 pub fn new(
176 ctx: &Context,
177 predicate: impl TryInto<Predicate, Error = impl std::fmt::Debug>,
178 display_order: impl IntoOrderBy,
179 minimum_row_height: u32,
180 buffer_factor: f64,
181 viewport_height: u32,
182 ) -> Result<Self, ankurah::error::RetrievalError> {
183 let predicate = predicate.try_into().expect("Failed to parse predicate");
184 let display_order = display_order
185 .into_order_by()
186 .expect("Failed to parse order");
187 let buffer_factor = buffer_factor.max(2.0);
188
189 let screen_items = windowing::screen_items(viewport_height, minimum_row_height);
191 let threshold = buffer_factor / 2.0;
192 let limit = windowing::live_window_size(screen_items, threshold);
193
194 let selection = Selection {
196 predicate: predicate.clone(),
197 order_by: Some(display_order.clone()),
198 limit: Some(limit as u64),
199 };
200 let livequery: LiveQuery<V> = ctx.query(selection)?;
201
202 let visible_set: Mut<VisibleSet<V>> = Mut::new(VisibleSet::default());
204 let pending: Mut<Option<PendingSlide>> = Mut::new(None);
205 let last_trigger_oldest_visible: Mut<Option<EntityId>> = Mut::new(None);
206 let mode: Mut<ScrollMode> = Mut::new(ScrollMode::Live);
207 let initialized: Mut<bool> = Mut::new(false);
208 let debug_info: Mut<ScrollDebugInfo> = Mut::new(ScrollDebugInfo {
209 trigger_threshold: screen_items,
210 ..Default::default()
211 });
212
213 let is_desc = display_order
215 .first()
216 .map(|o| o.direction == OrderDirection::Desc)
217 .unwrap_or(false);
218
219 let visible_set_clone = visible_set.clone();
221 let pending_clone = pending.clone();
222 let mode_clone = mode.clone();
223 let initialized_clone = initialized.clone();
224 let subscription = livequery.subscribe(move |changeset: ChangeSet<V>| {
225 tracing::trace!("[subscription] CALLBACK FIRED");
226
227 if !initialized_clone.peek() {
229 tracing::debug!("[subscription] skipping - not yet initialized");
230 return;
231 }
232
233 let current = visible_set_clone.peek();
234 let mut items: Vec<V> = changeset.resultset.peek();
235 tracing::trace!("[subscription] processing {} items, current has {}", items.len(), current.items.len());
236
237 let pending_slide = pending_clone.peek();
244 let should_process_slide = pending_slide.is_some() && changeset.resultset.is_loaded();
245 let slide = if should_process_slide {
246 pending_clone.set(None);
247 pending_slide
248 } else {
249 None
250 };
251
252 let used_reversed_order = slide.as_ref().map(|s| s.reversed_order).unwrap_or(false);
255 if is_desc && !used_reversed_order {
256 items.reverse();
257 }
258
259 let (has_more_preceding, has_more_following, intersection, error) = if let Some(ref slide) = slide {
261 let (has_more_preceding, has_more_following) = match slide.direction {
263 LoadDirection::Backward => {
264 let more_older = if items.len() > slide.limit {
265 items.remove(0); true
267 } else {
268 false
269 };
270 (more_older, true) }
272 LoadDirection::Forward => {
273 let more_newer = if items.len() > slide.limit {
274 items.pop(); true
276 } else {
277 mode_clone.set(ScrollMode::Live);
279 false
280 };
281 let more_older = current.has_more_preceding ||
283 current.items.first().map(|old| items.first().map(|new|
284 old.entity().id() != new.entity().id()
285 ).unwrap_or(false)).unwrap_or(false);
286 (more_older, more_newer)
287 }
288 };
289
290 tracing::trace!(
292 "[subscription] Looking for anchor {:?} in {} items",
293 slide.anchor, items.len()
294 );
295 let (intersection, error) = match items.iter().position(|item| item.entity().id() == slide.anchor) {
296 Some(index) => {
297 let anchor_ts = items.get(index).and_then(|i| i.entity().value("timestamp"));
298 tracing::trace!(
299 "[subscription] INTERSECTION: anchor {:?} (ts={:?}) found at index {}",
300 slide.anchor, anchor_ts, index
301 );
302 (
303 Some(Intersection {
304 entity_id: slide.anchor,
305 index,
306 direction: slide.direction,
307 }),
308 None
309 )
310 },
311 None => {
312 if slide.direction == LoadDirection::Forward {
313 tracing::trace!("[subscription] Forward slide: no overlap, jumping to live");
314 (None, None)
315 } else {
316 tracing::error!(
317 "[subscription] INTERSECTION FAILED: anchor {:?} not found in {} items",
318 slide.anchor, items.len()
319 );
320 (None, Some(format!(
321 "Intersection failed: anchor {} not found in result",
322 slide.anchor
323 )))
324 }
325 }
326 };
327
328 (has_more_preceding, has_more_following, intersection, error)
329 } else {
330 (current.has_more_preceding, current.has_more_following, None, None)
331 };
332
333 tracing::trace!(
334 "[subscription] visible_set: items={}, has_more_preceding={}, has_more_following={}",
335 items.len(), has_more_preceding, has_more_following
336 );
337
338 visible_set_clone.set(VisibleSet {
339 items,
340 intersection,
341 has_more_preceding,
342 has_more_following,
343 should_auto_scroll: mode_clone.peek() == ScrollMode::Live,
344 error,
345 });
346 });
347
348 Ok(Self {
349 livequery,
350 predicate,
351 display_order,
352 visible_set,
353 mode,
354 initialized,
355 pending,
356 last_trigger_oldest_visible,
357 debug_info,
358 update_count: std::sync::atomic::AtomicU32::new(0),
359 minimum_row_height,
360 buffer_factor,
361 viewport_height,
362 _subscription: subscription,
363 })
364 }
365
366 pub async fn start(&self) {
369 self.livequery.wait_initialized().await;
370
371 let mut items: Vec<V> = self.livequery.peek();
372
373 let is_desc = self
374 .display_order
375 .first()
376 .map(|o| o.direction == OrderDirection::Desc)
377 .unwrap_or(false);
378 if is_desc {
379 items.reverse();
380 }
381
382 let live_window = self.live_window_size();
383 let has_more_preceding = items.len() >= live_window;
384
385 tracing::debug!(
386 "[start] initial visible_set: items={}, has_more_preceding={}",
387 items.len(), has_more_preceding
388 );
389
390 self.visible_set.set(VisibleSet {
391 items,
392 intersection: None,
393 has_more_preceding,
394 has_more_following: false,
395 should_auto_scroll: true,
396 error: None,
397 });
398
399 self.initialized.set(true);
401 }
402
403 fn threshold(&self) -> f64 {
405 self.buffer_factor / 2.0
406 }
407
408 fn screen_items(&self) -> usize {
409 windowing::screen_items(self.viewport_height, self.minimum_row_height)
410 }
411
412 fn live_window_size(&self) -> usize {
413 windowing::live_window_size(self.screen_items(), self.threshold())
414 }
415
416 pub fn visible_set(&self) -> Read<VisibleSet<V>> {
418 self.visible_set.read()
419 }
420
421 pub fn mode(&self) -> ScrollMode {
422 self.mode.peek()
423 }
424
425 pub fn current_selection(&self) -> String {
427 let (selection, _version) = self.livequery.selection().peek();
428 format!("{}", selection)
429 }
430
431 pub fn debug_info(&self) -> Read<ScrollDebugInfo> {
433 self.debug_info.read()
434 }
435
436 pub fn on_scroll(&self, first_visible: EntityId, last_visible: EntityId, scrolling_backward: bool) {
443
444 let current = self.visible_set.peek();
445 let screen = self.screen_items();
446
447 tracing::trace!(
448 "[on_scroll] window: items={}, has_more_preceding={}, has_more_following={}",
449 current.items.len(), current.has_more_preceding, current.has_more_following
450 );
451
452 let first_idx = current.items.iter().position(|item| item.entity().id() == first_visible);
454 let last_idx = current.items.iter().position(|item| item.entity().id() == last_visible);
455
456 let (first_visible_index, last_visible_index) = match (first_idx, last_idx) {
457 (Some(f), Some(l)) => (f, l),
458 _ => {
459 tracing::warn!(
460 "[on_scroll] EARLY RETURN: EntityId not found! first_idx={:?}, last_idx={:?}",
461 first_idx, last_idx
462 );
463 return;
464 }
465 };
466
467 let items_above = first_visible_index;
468 let items_below = current.items.len().saturating_sub(last_visible_index + 1);
469
470 self.debug_info.set(ScrollDebugInfo {
472 items_above,
473 items_below,
474 trigger_threshold: screen,
475 first_visible_index,
476 last_visible_index,
477 update_count: self.update_count.load(std::sync::atomic::Ordering::Relaxed),
478 update_pending: self.pending.peek().is_some(),
479 });
480
481 tracing::trace!(
482 "[on_scroll] indices: first={}, last={}, above={}, below={}",
483 first_visible_index, last_visible_index, items_above, items_below
484 );
485
486 if self.mode.peek() == ScrollMode::Live && items_below > 0 {
489 tracing::debug!("[on_scroll] Exiting Live mode (item scrolled off bottom, items_below={})", items_below);
490 self.mode.set(ScrollMode::Backward);
491 let mut updated = current.clone();
493 updated.should_auto_scroll = false;
494 self.visible_set.set(updated);
495 }
496
497 let at_bottom = !current.has_more_following && items_below == 0;
500 if self.mode.peek() != ScrollMode::Live && at_bottom {
501 tracing::debug!("[on_scroll] Re-entering Live mode (scrolled to bottom)");
502 self.mode.set(ScrollMode::Live);
503 let mut updated = current.clone();
505 updated.should_auto_scroll = true;
506 self.visible_set.set(updated);
507 }
508
509 let backward_threshold = scrolling_backward && items_above <= screen && current.has_more_preceding;
511 let forward_threshold = !scrolling_backward && items_below <= screen && current.has_more_following;
512
513 if backward_threshold {
515 tracing::debug!("[on_scroll] TRIGGERING BACKWARD PAGINATION");
516 self.mode.set(ScrollMode::Backward);
517 self.slide_window(¤t, first_visible_index, last_visible_index, LoadDirection::Backward);
518 } else if forward_threshold {
519 tracing::debug!("[on_scroll] TRIGGERING FORWARD PAGINATION");
520 self.mode.set(ScrollMode::Forward);
521 self.slide_window(¤t, first_visible_index, last_visible_index, LoadDirection::Forward);
522 }
523 }
524
525 fn slide_window(
530 &self,
531 current: &VisibleSet<V>,
532 oldest_visible_index: usize,
533 newest_visible_index: usize,
534 direction: LoadDirection,
535 ) {
536 let buffer = 2 * self.screen_items(); let max_index = current.items.len().saturating_sub(1);
538
539 let (cursor_index, intersection_index, operator, reversed_order) = match direction {
542 LoadDirection::Backward => (
543 (newest_visible_index + buffer).min(max_index),
546 newest_visible_index, ComparisonOperator::LessThanOrEqual,
548 false,
549 ),
550 LoadDirection::Forward => (
551 oldest_visible_index.saturating_sub(buffer),
554 oldest_visible_index,
555 ComparisonOperator::GreaterThanOrEqual,
556 true, ),
558 };
559
560 let visible_span = newest_visible_index.saturating_sub(oldest_visible_index) + 1;
562 let limit = visible_span + 2 * buffer;
563
564 let continuation = current.items.get(cursor_index)
566 .map(|item| item.entity().id())
567 .expect("cursor item must exist");
568 let anchor = current.items.get(intersection_index)
569 .map(|item| item.entity().id())
570 .expect("anchor item must exist");
571
572 let threshold = self.screen_items(); let oldest_visible_entity = current.items.get(oldest_visible_index)
577 .map(|item| item.entity().id());
578
579 tracing::trace!(
580 "[slide_window] DEBOUNCE CHECK: oldest_visible_idx={}, oldest_visible_entity={:?}, array_len={}",
581 oldest_visible_index, oldest_visible_entity, current.items.len()
582 );
583
584 if let Some(last_oldest) = self.last_trigger_oldest_visible.peek() {
585 tracing::trace!(
586 "[slide_window] last_trigger_oldest_visible={:?}",
587 last_oldest
588 );
589
590 let last_idx = current.items.iter()
592 .position(|item| item.entity().id() == last_oldest);
593
594 tracing::trace!(
595 "[slide_window] last_oldest found at index: {:?}",
596 last_idx
597 );
598
599 if let Some(l_idx) = last_idx {
600 let distance = if oldest_visible_index < l_idx {
603 l_idx - oldest_visible_index
604 } else {
605 oldest_visible_index - l_idx
606 };
607 tracing::trace!(
608 "[slide_window] distance={}, threshold={} (l_idx={}, oldest_visible_idx={})",
609 distance, threshold, l_idx, oldest_visible_index
610 );
611 if distance < threshold {
612 tracing::trace!(
613 "[slide_window] DEBOUNCE: scrolled {} items < threshold {}, SKIPPING",
614 distance, threshold
615 );
616 return;
617 }
618 tracing::trace!(
619 "[slide_window] ALLOWING: scrolled {} items >= threshold {}",
620 distance, threshold
621 );
622 } else {
623 tracing::trace!(
625 "[slide_window] ALLOWING: last oldest_visible {:?} NOT FOUND in array of {} items (window shifted)",
626 last_oldest, current.items.len()
627 );
628 }
629 } else {
630 tracing::trace!(
631 "[slide_window] ALLOWING: no last_trigger_oldest_visible (first trigger)"
632 );
633 }
634
635 if let Some(entity) = oldest_visible_entity {
637 tracing::trace!(
638 "[slide_window] Setting last_trigger_oldest_visible = {:?}",
639 entity
640 );
641 self.last_trigger_oldest_visible.set(Some(entity));
642 }
643
644 self.update_count.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
646
647 self.pending.set(Some(PendingSlide {
648 continuation,
649 anchor,
650 limit,
651 direction,
652 reversed_order,
653 }));
654
655 let predicate = self.build_cursor_predicate(current, cursor_index, operator);
657
658 let order_by = if reversed_order {
660 self.display_order.iter().map(|item| OrderByItem {
661 direction: match item.direction {
662 OrderDirection::Asc => OrderDirection::Desc,
663 OrderDirection::Desc => OrderDirection::Asc,
664 },
665 ..item.clone()
666 }).collect()
667 } else {
668 self.display_order.clone()
669 };
670
671 let selection = Selection {
672 predicate: predicate.clone(),
673 order_by: Some(order_by),
674 limit: Some((limit + 1) as u64), };
676
677 let first_ts = current.items.first().and_then(|i| i.entity().value("timestamp"));
679 let last_ts = current.items.last().and_then(|i| i.entity().value("timestamp"));
680 tracing::trace!(
681 "[slide_window] cursor_index={}, oldest_vis={}, newest_vis={}, max={}, limit={}, first_ts={:?}, last_ts={:?}",
682 cursor_index, oldest_visible_index, newest_visible_index, max_index, limit, first_ts, last_ts
683 );
684 tracing::debug!("[slide_window] update_selection: {}", selection);
685
686 if let Err(e) = self.livequery.update_selection(selection) {
687 tracing::error!("[slide_window] FAILED to update selection: {}", e);
688 }
689 }
690
691 fn build_cursor_predicate(
693 &self,
694 current: &VisibleSet<V>,
695 cursor_index: usize,
696 operator: ComparisonOperator,
697 ) -> Predicate {
698 let Some(cursor_item) = current.items.get(cursor_index) else {
699 return self.predicate.clone();
700 };
701 let Some(order_item) = self.display_order.first() else {
702 return self.predicate.clone();
703 };
704 let field_name = order_item.path.first();
705 let Some(cursor_value) = cursor_item.entity().value(field_name) else {
706 return self.predicate.clone();
707 };
708
709 tracing::trace!(
711 "[build_cursor_predicate] cursor_index={}, entity_id={}, field={}, value={:?}",
712 cursor_index,
713 cursor_item.entity().id(),
714 field_name,
715 cursor_value
716 );
717
718 let cursor_predicate = Predicate::Comparison {
719 left: Box::new(Expr::Path(PathExpr::simple(field_name))),
720 operator,
721 right: Box::new(Expr::Literal(value_to_literal(&cursor_value))),
722 };
723
724 Predicate::And(
725 Box::new(self.predicate.clone()),
726 Box::new(cursor_predicate),
727 )
728 }
729}
730
731pub fn parse_order_by(s: &str) -> Result<Vec<OrderByItem>, String> {
736 use ankql::parser::parse_selection;
737 let selection_str = format!("true ORDER BY {}", s);
738 let selection =
739 parse_selection(&selection_str).map_err(|e| format!("Failed to parse ORDER BY: {}", e))?;
740 selection
741 .order_by
742 .ok_or_else(|| "No ORDER BY parsed".to_string())
743}
744
745pub trait IntoOrderBy {
746 fn into_order_by(self) -> Result<Vec<OrderByItem>, String>;
747}
748
749impl IntoOrderBy for &str {
750 fn into_order_by(self) -> Result<Vec<OrderByItem>, String> {
751 parse_order_by(self)
752 }
753}
754
755impl IntoOrderBy for Vec<OrderByItem> {
756 fn into_order_by(self) -> Result<Vec<OrderByItem>, String> {
757 Ok(self)
758 }
759}
760
761pub use ankurah_virtual_scroll_derive::generate_scroll_manager;