pub mod windowing;
use ankql::ast::{
ComparisonOperator, Expr, Literal, OrderByItem, OrderDirection, PathExpr, Predicate, Selection,
};
use ankurah::changes::ChangeSet;
use ankurah::core::selection::filter::Filterable;
use ankurah::core::value::Value;
use ankurah::{model::View, Context, LiveQuery};
use ankurah_proto::EntityId;
use ankurah_signals::{Mut, Peek, Read, Subscribe};
pub use ankql::ast::{OrderByItem as OrderBy, Predicate as Filter};
pub use ankurah_proto::EntityId as Id;
pub use ankurah_signals;
#[derive(Clone, Debug)]
pub struct VisibleSet<V> {
pub items: Vec<V>,
pub intersection: Option<Intersection>,
pub has_more_preceding: bool,
pub has_more_following: bool,
pub should_auto_scroll: bool,
pub error: Option<String>,
}
impl<V> Default for VisibleSet<V> {
fn default() -> Self {
Self {
items: Vec::new(),
intersection: None,
has_more_preceding: true,
has_more_following: false,
should_auto_scroll: true,
error: None,
}
}
}
#[derive(Clone, Debug)]
pub struct Intersection {
pub entity_id: EntityId,
pub index: usize,
pub direction: LoadDirection,
}
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub enum LoadDirection {
Backward,
Forward,
}
#[derive(Clone, Debug)]
struct PendingSlide {
#[allow(dead_code)]
continuation: EntityId,
anchor: EntityId,
limit: usize,
direction: LoadDirection,
reversed_order: bool,
}
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub enum ScrollMode {
Live, Backward, Forward, }
#[derive(Clone, Debug, Default)]
pub struct ScrollDebugInfo {
pub items_above: usize,
pub items_below: usize,
pub trigger_threshold: usize,
pub first_visible_index: usize,
pub last_visible_index: usize,
pub update_count: u32,
pub update_pending: bool,
}
fn value_to_literal(value: &Value) -> Literal {
match value {
Value::I16(v) => Literal::I16(*v),
Value::I32(v) => Literal::I32(*v),
Value::I64(v) => Literal::I64(*v),
Value::F64(v) => Literal::F64(*v),
Value::Bool(v) => Literal::Bool(*v),
Value::String(v) => Literal::String(v.clone()),
_ => Literal::String(format!("{:?}", value)),
}
}
pub struct ScrollManager<V: View + Clone + Send + Sync + 'static> {
livequery: LiveQuery<V>,
predicate: Predicate,
display_order: Vec<OrderByItem>,
visible_set: Mut<VisibleSet<V>>,
mode: Mut<ScrollMode>,
initialized: Mut<bool>,
pending: Mut<Option<PendingSlide>>,
last_trigger_oldest_visible: Mut<Option<EntityId>>,
debug_info: Mut<ScrollDebugInfo>,
update_count: std::sync::atomic::AtomicU32,
minimum_row_height: u32,
buffer_factor: f64,
viewport_height: u32,
_subscription: ankurah_signals::SubscriptionGuard,
}
impl<V: View + Clone + Send + Sync + 'static> ScrollManager<V> {
pub fn new(
ctx: &Context,
predicate: impl TryInto<Predicate, Error = impl std::fmt::Debug>,
display_order: impl IntoOrderBy,
minimum_row_height: u32,
buffer_factor: f64,
viewport_height: u32,
) -> Result<Self, ankurah::error::RetrievalError> {
let predicate = predicate.try_into().expect("Failed to parse predicate");
let display_order = display_order
.into_order_by()
.expect("Failed to parse order");
let buffer_factor = buffer_factor.max(2.0);
let screen_items = windowing::screen_items(viewport_height, minimum_row_height);
let threshold = buffer_factor / 2.0;
let limit = windowing::live_window_size(screen_items, threshold);
let selection = Selection {
predicate: predicate.clone(),
order_by: Some(display_order.clone()),
limit: Some(limit as u64),
};
let livequery: LiveQuery<V> = ctx.query(selection)?;
let visible_set: Mut<VisibleSet<V>> = Mut::new(VisibleSet::default());
let pending: Mut<Option<PendingSlide>> = Mut::new(None);
let last_trigger_oldest_visible: Mut<Option<EntityId>> = Mut::new(None);
let mode: Mut<ScrollMode> = Mut::new(ScrollMode::Live);
let initialized: Mut<bool> = Mut::new(false);
let debug_info: Mut<ScrollDebugInfo> = Mut::new(ScrollDebugInfo {
trigger_threshold: screen_items,
..Default::default()
});
let is_desc = display_order
.first()
.map(|o| o.direction == OrderDirection::Desc)
.unwrap_or(false);
let visible_set_clone = visible_set.clone();
let pending_clone = pending.clone();
let mode_clone = mode.clone();
let initialized_clone = initialized.clone();
let subscription = livequery.subscribe(move |changeset: ChangeSet<V>| {
tracing::trace!("[subscription] CALLBACK FIRED");
if !initialized_clone.peek() {
tracing::debug!("[subscription] skipping - not yet initialized");
return;
}
let current = visible_set_clone.peek();
let mut items: Vec<V> = changeset.resultset.peek();
tracing::trace!("[subscription] processing {} items, current has {}", items.len(), current.items.len());
let pending_slide = pending_clone.peek();
let should_process_slide = pending_slide.is_some() && changeset.resultset.is_loaded();
let slide = if should_process_slide {
pending_clone.set(None);
pending_slide
} else {
None
};
let used_reversed_order = slide.as_ref().map(|s| s.reversed_order).unwrap_or(false);
if is_desc && !used_reversed_order {
items.reverse();
}
let (has_more_preceding, has_more_following, intersection, error) = if let Some(ref slide) = slide {
let (has_more_preceding, has_more_following) = match slide.direction {
LoadDirection::Backward => {
let more_older = if items.len() > slide.limit {
items.remove(0); true
} else {
false
};
(more_older, true) }
LoadDirection::Forward => {
let more_newer = if items.len() > slide.limit {
items.pop(); true
} else {
mode_clone.set(ScrollMode::Live);
false
};
let more_older = current.has_more_preceding ||
current.items.first().map(|old| items.first().map(|new|
old.entity().id() != new.entity().id()
).unwrap_or(false)).unwrap_or(false);
(more_older, more_newer)
}
};
tracing::trace!(
"[subscription] Looking for anchor {:?} in {} items",
slide.anchor, items.len()
);
let (intersection, error) = match items.iter().position(|item| item.entity().id() == slide.anchor) {
Some(index) => {
let anchor_ts = items.get(index).and_then(|i| i.entity().value("timestamp"));
tracing::trace!(
"[subscription] INTERSECTION: anchor {:?} (ts={:?}) found at index {}",
slide.anchor, anchor_ts, index
);
(
Some(Intersection {
entity_id: slide.anchor,
index,
direction: slide.direction,
}),
None
)
},
None => {
if slide.direction == LoadDirection::Forward {
tracing::trace!("[subscription] Forward slide: no overlap, jumping to live");
(None, None)
} else {
tracing::error!(
"[subscription] INTERSECTION FAILED: anchor {:?} not found in {} items",
slide.anchor, items.len()
);
(None, Some(format!(
"Intersection failed: anchor {} not found in result",
slide.anchor
)))
}
}
};
(has_more_preceding, has_more_following, intersection, error)
} else {
(current.has_more_preceding, current.has_more_following, None, None)
};
tracing::trace!(
"[subscription] visible_set: items={}, has_more_preceding={}, has_more_following={}",
items.len(), has_more_preceding, has_more_following
);
visible_set_clone.set(VisibleSet {
items,
intersection,
has_more_preceding,
has_more_following,
should_auto_scroll: mode_clone.peek() == ScrollMode::Live,
error,
});
});
Ok(Self {
livequery,
predicate,
display_order,
visible_set,
mode,
initialized,
pending,
last_trigger_oldest_visible,
debug_info,
update_count: std::sync::atomic::AtomicU32::new(0),
minimum_row_height,
buffer_factor,
viewport_height,
_subscription: subscription,
})
}
pub async fn start(&self) {
self.livequery.wait_initialized().await;
let mut items: Vec<V> = self.livequery.peek();
let is_desc = self
.display_order
.first()
.map(|o| o.direction == OrderDirection::Desc)
.unwrap_or(false);
if is_desc {
items.reverse();
}
let live_window = self.live_window_size();
let has_more_preceding = items.len() >= live_window;
tracing::debug!(
"[start] initial visible_set: items={}, has_more_preceding={}",
items.len(), has_more_preceding
);
self.visible_set.set(VisibleSet {
items,
intersection: None,
has_more_preceding,
has_more_following: false,
should_auto_scroll: true,
error: None,
});
self.initialized.set(true);
}
fn threshold(&self) -> f64 {
self.buffer_factor / 2.0
}
fn screen_items(&self) -> usize {
windowing::screen_items(self.viewport_height, self.minimum_row_height)
}
fn live_window_size(&self) -> usize {
windowing::live_window_size(self.screen_items(), self.threshold())
}
pub fn visible_set(&self) -> Read<VisibleSet<V>> {
self.visible_set.read()
}
pub fn mode(&self) -> ScrollMode {
self.mode.peek()
}
pub fn current_selection(&self) -> String {
let (selection, _version) = self.livequery.selection().peek();
format!("{}", selection)
}
pub fn debug_info(&self) -> Read<ScrollDebugInfo> {
self.debug_info.read()
}
pub fn on_scroll(&self, first_visible: EntityId, last_visible: EntityId, scrolling_backward: bool) {
let current = self.visible_set.peek();
let screen = self.screen_items();
tracing::trace!(
"[on_scroll] window: items={}, has_more_preceding={}, has_more_following={}",
current.items.len(), current.has_more_preceding, current.has_more_following
);
let first_idx = current.items.iter().position(|item| item.entity().id() == first_visible);
let last_idx = current.items.iter().position(|item| item.entity().id() == last_visible);
let (first_visible_index, last_visible_index) = match (first_idx, last_idx) {
(Some(f), Some(l)) => (f, l),
_ => {
tracing::warn!(
"[on_scroll] EARLY RETURN: EntityId not found! first_idx={:?}, last_idx={:?}",
first_idx, last_idx
);
return;
}
};
let items_above = first_visible_index;
let items_below = current.items.len().saturating_sub(last_visible_index + 1);
self.debug_info.set(ScrollDebugInfo {
items_above,
items_below,
trigger_threshold: screen,
first_visible_index,
last_visible_index,
update_count: self.update_count.load(std::sync::atomic::Ordering::Relaxed),
update_pending: self.pending.peek().is_some(),
});
tracing::trace!(
"[on_scroll] indices: first={}, last={}, above={}, below={}",
first_visible_index, last_visible_index, items_above, items_below
);
if self.mode.peek() == ScrollMode::Live && items_below > 0 {
tracing::debug!("[on_scroll] Exiting Live mode (item scrolled off bottom, items_below={})", items_below);
self.mode.set(ScrollMode::Backward);
let mut updated = current.clone();
updated.should_auto_scroll = false;
self.visible_set.set(updated);
}
let at_bottom = !current.has_more_following && items_below == 0;
if self.mode.peek() != ScrollMode::Live && at_bottom {
tracing::debug!("[on_scroll] Re-entering Live mode (scrolled to bottom)");
self.mode.set(ScrollMode::Live);
let mut updated = current.clone();
updated.should_auto_scroll = true;
self.visible_set.set(updated);
}
let backward_threshold = scrolling_backward && items_above <= screen && current.has_more_preceding;
let forward_threshold = !scrolling_backward && items_below <= screen && current.has_more_following;
if backward_threshold {
tracing::debug!("[on_scroll] TRIGGERING BACKWARD PAGINATION");
self.mode.set(ScrollMode::Backward);
self.slide_window(¤t, first_visible_index, last_visible_index, LoadDirection::Backward);
} else if forward_threshold {
tracing::debug!("[on_scroll] TRIGGERING FORWARD PAGINATION");
self.mode.set(ScrollMode::Forward);
self.slide_window(¤t, first_visible_index, last_visible_index, LoadDirection::Forward);
}
}
fn slide_window(
&self,
current: &VisibleSet<V>,
oldest_visible_index: usize,
newest_visible_index: usize,
direction: LoadDirection,
) {
let buffer = 2 * self.screen_items(); let max_index = current.items.len().saturating_sub(1);
let (cursor_index, intersection_index, operator, reversed_order) = match direction {
LoadDirection::Backward => (
(newest_visible_index + buffer).min(max_index),
newest_visible_index, ComparisonOperator::LessThanOrEqual,
false,
),
LoadDirection::Forward => (
oldest_visible_index.saturating_sub(buffer),
oldest_visible_index,
ComparisonOperator::GreaterThanOrEqual,
true, ),
};
let visible_span = newest_visible_index.saturating_sub(oldest_visible_index) + 1;
let limit = visible_span + 2 * buffer;
let continuation = current.items.get(cursor_index)
.map(|item| item.entity().id())
.expect("cursor item must exist");
let anchor = current.items.get(intersection_index)
.map(|item| item.entity().id())
.expect("anchor item must exist");
let threshold = self.screen_items(); let oldest_visible_entity = current.items.get(oldest_visible_index)
.map(|item| item.entity().id());
tracing::trace!(
"[slide_window] DEBOUNCE CHECK: oldest_visible_idx={}, oldest_visible_entity={:?}, array_len={}",
oldest_visible_index, oldest_visible_entity, current.items.len()
);
if let Some(last_oldest) = self.last_trigger_oldest_visible.peek() {
tracing::trace!(
"[slide_window] last_trigger_oldest_visible={:?}",
last_oldest
);
let last_idx = current.items.iter()
.position(|item| item.entity().id() == last_oldest);
tracing::trace!(
"[slide_window] last_oldest found at index: {:?}",
last_idx
);
if let Some(l_idx) = last_idx {
let distance = if oldest_visible_index < l_idx {
l_idx - oldest_visible_index
} else {
oldest_visible_index - l_idx
};
tracing::trace!(
"[slide_window] distance={}, threshold={} (l_idx={}, oldest_visible_idx={})",
distance, threshold, l_idx, oldest_visible_index
);
if distance < threshold {
tracing::trace!(
"[slide_window] DEBOUNCE: scrolled {} items < threshold {}, SKIPPING",
distance, threshold
);
return;
}
tracing::trace!(
"[slide_window] ALLOWING: scrolled {} items >= threshold {}",
distance, threshold
);
} else {
tracing::trace!(
"[slide_window] ALLOWING: last oldest_visible {:?} NOT FOUND in array of {} items (window shifted)",
last_oldest, current.items.len()
);
}
} else {
tracing::trace!(
"[slide_window] ALLOWING: no last_trigger_oldest_visible (first trigger)"
);
}
if let Some(entity) = oldest_visible_entity {
tracing::trace!(
"[slide_window] Setting last_trigger_oldest_visible = {:?}",
entity
);
self.last_trigger_oldest_visible.set(Some(entity));
}
self.update_count.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
self.pending.set(Some(PendingSlide {
continuation,
anchor,
limit,
direction,
reversed_order,
}));
let predicate = self.build_cursor_predicate(current, cursor_index, operator);
let order_by = if reversed_order {
self.display_order.iter().map(|item| OrderByItem {
direction: match item.direction {
OrderDirection::Asc => OrderDirection::Desc,
OrderDirection::Desc => OrderDirection::Asc,
},
..item.clone()
}).collect()
} else {
self.display_order.clone()
};
let selection = Selection {
predicate: predicate.clone(),
order_by: Some(order_by),
limit: Some((limit + 1) as u64), };
let first_ts = current.items.first().and_then(|i| i.entity().value("timestamp"));
let last_ts = current.items.last().and_then(|i| i.entity().value("timestamp"));
tracing::trace!(
"[slide_window] cursor_index={}, oldest_vis={}, newest_vis={}, max={}, limit={}, first_ts={:?}, last_ts={:?}",
cursor_index, oldest_visible_index, newest_visible_index, max_index, limit, first_ts, last_ts
);
tracing::debug!("[slide_window] update_selection: {}", selection);
if let Err(e) = self.livequery.update_selection(selection) {
tracing::error!("[slide_window] FAILED to update selection: {}", e);
}
}
fn build_cursor_predicate(
&self,
current: &VisibleSet<V>,
cursor_index: usize,
operator: ComparisonOperator,
) -> Predicate {
let Some(cursor_item) = current.items.get(cursor_index) else {
return self.predicate.clone();
};
let Some(order_item) = self.display_order.first() else {
return self.predicate.clone();
};
let field_name = order_item.path.first();
let Some(cursor_value) = cursor_item.entity().value(field_name) else {
return self.predicate.clone();
};
tracing::trace!(
"[build_cursor_predicate] cursor_index={}, entity_id={}, field={}, value={:?}",
cursor_index,
cursor_item.entity().id(),
field_name,
cursor_value
);
let cursor_predicate = Predicate::Comparison {
left: Box::new(Expr::Path(PathExpr::simple(field_name))),
operator,
right: Box::new(Expr::Literal(value_to_literal(&cursor_value))),
};
Predicate::And(
Box::new(self.predicate.clone()),
Box::new(cursor_predicate),
)
}
}
pub fn parse_order_by(s: &str) -> Result<Vec<OrderByItem>, String> {
use ankql::parser::parse_selection;
let selection_str = format!("true ORDER BY {}", s);
let selection =
parse_selection(&selection_str).map_err(|e| format!("Failed to parse ORDER BY: {}", e))?;
selection
.order_by
.ok_or_else(|| "No ORDER BY parsed".to_string())
}
pub trait IntoOrderBy {
fn into_order_by(self) -> Result<Vec<OrderByItem>, String>;
}
impl IntoOrderBy for &str {
fn into_order_by(self) -> Result<Vec<OrderByItem>, String> {
parse_order_by(self)
}
}
impl IntoOrderBy for Vec<OrderByItem> {
fn into_order_by(self) -> Result<Vec<OrderByItem>, String> {
Ok(self)
}
}
pub use ankurah_virtual_scroll_derive::generate_scroll_manager;