pub mod windowing;
use ankql::ast::{
ComparisonOperator, Expr, Literal, OrderByItem, OrderDirection, PathExpr, Predicate, Selection,
};
use ankurah::changes::ChangeSet;
use ankurah::core::selection::filter::Filterable;
use ankurah::core::value::Value;
use ankurah::{model::View, Context, LiveQuery};
use ankurah_proto::EntityId;
use ankurah_signals::{Mut, Peek, Read, Subscribe};
pub use ankql::ast::{OrderByItem as OrderBy, Predicate as Filter};
pub use ankurah_proto::EntityId as Id;
pub use ankurah_signals;
#[derive(Clone, Debug)]
pub struct VisibleSet<V> {
pub items: Vec<V>,
pub intersection: Option<Intersection>,
pub has_more_preceding: bool,
pub has_more_following: bool,
pub should_auto_scroll: bool,
pub error: Option<String>,
}
impl<V> Default for VisibleSet<V> {
fn default() -> Self {
Self {
items: Vec::new(),
intersection: None,
has_more_preceding: true,
has_more_following: false,
should_auto_scroll: true,
error: None,
}
}
}
#[derive(Clone, Debug)]
pub struct Intersection {
pub entity_id: EntityId,
pub index: usize,
pub direction: LoadDirection,
}
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub enum LoadDirection {
Backward,
Forward,
}
#[derive(Clone, Debug)]
struct PendingSlide {
continuation: EntityId,
limit: usize,
direction: LoadDirection,
reversed_order: bool,
}
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub enum ScrollMode {
Live, Backward, Forward, }
fn value_to_literal(value: &Value) -> Literal {
match value {
Value::I16(v) => Literal::I16(*v),
Value::I32(v) => Literal::I32(*v),
Value::I64(v) => Literal::I64(*v),
Value::F64(v) => Literal::F64(*v),
Value::Bool(v) => Literal::Bool(*v),
Value::String(v) => Literal::String(v.clone()),
_ => Literal::String(format!("{:?}", value)),
}
}
pub struct ScrollManager<V: View + Clone + Send + Sync + 'static> {
livequery: LiveQuery<V>,
predicate: Predicate,
display_order: Vec<OrderByItem>,
visible_set: Mut<VisibleSet<V>>,
mode: Mut<ScrollMode>,
pending: Mut<Option<PendingSlide>>,
minimum_row_height: u32,
buffer_factor: f64,
viewport_height: u32,
_subscription: ankurah_signals::SubscriptionGuard,
}
impl<V: View + Clone + Send + Sync + 'static> ScrollManager<V> {
pub fn new(
ctx: &Context,
predicate: impl TryInto<Predicate, Error = impl std::fmt::Debug>,
display_order: impl IntoOrderBy,
minimum_row_height: u32,
buffer_factor: f64,
viewport_height: u32,
) -> Result<Self, ankurah::error::RetrievalError> {
let predicate = predicate.try_into().expect("Failed to parse predicate");
let display_order = display_order
.into_order_by()
.expect("Failed to parse order");
let buffer_factor = buffer_factor.max(2.0);
let screen_items = windowing::screen_items(viewport_height, minimum_row_height);
let threshold = buffer_factor / 2.0;
let limit = windowing::live_window_size(screen_items, threshold);
let selection = Selection {
predicate: predicate.clone(),
order_by: Some(display_order.clone()),
limit: Some(limit as u64),
};
let livequery: LiveQuery<V> = ctx.query(selection)?;
let visible_set: Mut<VisibleSet<V>> = Mut::new(VisibleSet::default());
let pending: Mut<Option<PendingSlide>> = Mut::new(None);
let mode: Mut<ScrollMode> = Mut::new(ScrollMode::Live);
let is_desc = display_order
.first()
.map(|o| o.direction == OrderDirection::Desc)
.unwrap_or(false);
let visible_set_clone = visible_set.clone();
let pending_clone = pending.clone();
let mode_clone = mode.clone();
let subscription = livequery.subscribe(move |changeset: ChangeSet<V>| {
let current = visible_set_clone.peek();
if current.items.is_empty() && !changeset.resultset.peek().is_empty() {
return;
}
let mut items: Vec<V> = changeset.resultset.peek();
let slide = pending_clone.peek();
pending_clone.set(None);
let used_reversed_order = slide.as_ref().map(|s| s.reversed_order).unwrap_or(false);
if is_desc && !used_reversed_order {
items.reverse();
}
let (has_more_preceding, has_more_following, intersection, error) = if let Some(ref slide) = slide {
let (has_more_preceding, has_more_following) = match slide.direction {
LoadDirection::Backward => {
let more_older = if items.len() > slide.limit {
items.remove(0); true
} else {
false
};
(more_older, true) }
LoadDirection::Forward => {
let more_newer = if items.len() > slide.limit {
items.pop(); true
} else {
mode_clone.set(ScrollMode::Live);
false
};
let more_older = current.has_more_preceding ||
current.items.first().map(|old| items.first().map(|new|
old.entity().id() != new.entity().id()
).unwrap_or(false)).unwrap_or(false);
(more_older, more_newer)
}
};
let (intersection, error) = match items.iter().position(|item| item.entity().id() == slide.continuation) {
Some(index) => (
Some(Intersection {
entity_id: slide.continuation,
index,
direction: slide.direction,
}),
None
),
None => {
if slide.direction == LoadDirection::Forward {
tracing::debug!("Forward slide: no overlap, jumping to live");
(None, None)
} else {
(None, Some(format!(
"Intersection failed: {} not found in result",
slide.continuation
)))
}
}
};
(has_more_preceding, has_more_following, intersection, error)
} else {
(current.has_more_preceding, current.has_more_following, None, None)
};
visible_set_clone.set(VisibleSet {
items,
intersection,
has_more_preceding,
has_more_following,
should_auto_scroll: mode_clone.peek() == ScrollMode::Live,
error,
});
});
Ok(Self {
livequery,
predicate,
display_order,
visible_set,
mode,
pending,
minimum_row_height,
buffer_factor,
viewport_height,
_subscription: subscription,
})
}
pub async fn start(&self) {
self.livequery.wait_initialized().await;
let mut items: Vec<V> = self.livequery.peek();
let is_desc = self
.display_order
.first()
.map(|o| o.direction == OrderDirection::Desc)
.unwrap_or(false);
if is_desc {
items.reverse();
}
let live_window = self.live_window_size();
let has_more_preceding = items.len() >= live_window;
self.visible_set.set(VisibleSet {
items,
intersection: None,
has_more_preceding,
has_more_following: false,
should_auto_scroll: true,
error: None,
});
}
fn threshold(&self) -> f64 {
self.buffer_factor / 2.0
}
fn screen_items(&self) -> usize {
windowing::screen_items(self.viewport_height, self.minimum_row_height)
}
fn live_window_size(&self) -> usize {
windowing::live_window_size(self.screen_items(), self.threshold())
}
pub fn visible_set(&self) -> Read<VisibleSet<V>> {
self.visible_set.read()
}
pub fn mode(&self) -> ScrollMode {
self.mode.peek()
}
pub fn current_selection(&self) -> String {
let (selection, _version) = self.livequery.selection().peek();
format!("{}", selection)
}
pub fn on_scroll(&self, first_visible: EntityId, last_visible: EntityId, scrolling_backward: bool) {
let current = self.visible_set.peek();
let screen = self.screen_items();
let first_idx = current.items.iter().position(|item| item.entity().id() == first_visible);
let last_idx = current.items.iter().position(|item| item.entity().id() == last_visible);
let (first_visible_index, last_visible_index) = match (first_idx, last_idx) {
(Some(f), Some(l)) => (f, l),
_ => return, };
let items_above = first_visible_index;
let items_below = current.items.len().saturating_sub(last_visible_index + 1);
tracing::debug!(
"on_scroll: first={}, last={}, items_above={}, items_below={}, screen={}, scrolling_backward={}, has_more_preceding={}",
first_visible_index, last_visible_index, items_above, items_below, screen, scrolling_backward, current.has_more_preceding
);
if scrolling_backward && items_above <= screen && current.has_more_preceding {
self.mode.set(ScrollMode::Backward);
self.slide_window(¤t, first_visible_index, last_visible_index, LoadDirection::Backward);
} else if !scrolling_backward && items_below <= screen && current.has_more_following {
self.mode.set(ScrollMode::Forward);
self.slide_window(¤t, first_visible_index, last_visible_index, LoadDirection::Forward);
}
}
fn slide_window(
&self,
current: &VisibleSet<V>,
oldest_visible_index: usize,
newest_visible_index: usize,
direction: LoadDirection,
) {
let buffer = 2 * self.screen_items(); let max_index = current.items.len().saturating_sub(1);
let (cursor_index, intersection_index, operator, reversed_order) = match direction {
LoadDirection::Backward => (
(newest_visible_index + buffer).min(max_index),
newest_visible_index,
ComparisonOperator::LessThanOrEqual,
false,
),
LoadDirection::Forward => (
if current.has_more_preceding {
oldest_visible_index.saturating_sub(buffer)
} else {
0
},
oldest_visible_index,
ComparisonOperator::GreaterThanOrEqual,
true,
),
};
let limit = (cursor_index.max(newest_visible_index) - cursor_index.min(oldest_visible_index) + 1) + buffer;
tracing::debug!(
"slide_window({:?}): visible=[{},{}], cursor={}, limit={}",
direction, oldest_visible_index, newest_visible_index, cursor_index, limit
);
let continuation = current.items.get(intersection_index)
.map(|item| item.entity().id())
.expect("intersection item must exist");
self.pending.set(Some(PendingSlide {
continuation,
limit,
direction,
reversed_order,
}));
let predicate = self.build_cursor_predicate(current, cursor_index, operator);
let order_by = if reversed_order {
self.display_order.iter().map(|item| OrderByItem {
direction: match item.direction {
OrderDirection::Asc => OrderDirection::Desc,
OrderDirection::Desc => OrderDirection::Asc,
},
..item.clone()
}).collect()
} else {
self.display_order.clone()
};
let selection = Selection {
predicate,
order_by: Some(order_by),
limit: Some((limit + 1) as u64), };
if let Err(e) = self.livequery.update_selection(selection) {
tracing::error!("Failed to update selection for {:?} slide: {}", direction, e);
}
}
fn build_cursor_predicate(
&self,
current: &VisibleSet<V>,
cursor_index: usize,
operator: ComparisonOperator,
) -> Predicate {
let Some(cursor_item) = current.items.get(cursor_index) else {
return self.predicate.clone();
};
let Some(order_item) = self.display_order.first() else {
return self.predicate.clone();
};
let field_name = order_item.path.first();
let Some(cursor_value) = cursor_item.entity().value(field_name) else {
return self.predicate.clone();
};
let cursor_predicate = Predicate::Comparison {
left: Box::new(Expr::Path(PathExpr::simple(field_name))),
operator,
right: Box::new(Expr::Literal(value_to_literal(&cursor_value))),
};
Predicate::And(
Box::new(self.predicate.clone()),
Box::new(cursor_predicate),
)
}
}
pub fn parse_order_by(s: &str) -> Result<Vec<OrderByItem>, String> {
use ankql::parser::parse_selection;
let selection_str = format!("true ORDER BY {}", s);
let selection =
parse_selection(&selection_str).map_err(|e| format!("Failed to parse ORDER BY: {}", e))?;
selection
.order_by
.ok_or_else(|| "No ORDER BY parsed".to_string())
}
pub trait IntoOrderBy {
fn into_order_by(self) -> Result<Vec<OrderByItem>, String>;
}
impl IntoOrderBy for &str {
fn into_order_by(self) -> Result<Vec<OrderByItem>, String> {
parse_order_by(self)
}
}
impl IntoOrderBy for Vec<OrderByItem> {
fn into_order_by(self) -> Result<Vec<OrderByItem>, String> {
Ok(self)
}
}
pub use ankurah_virtual_scroll_derive::generate_scroll_manager;