use crate::client::post_bundle::post_bundle_manager::PostBundleManager;
use crate::client::timeline::recent_posts_pen::RecentPostsPen;
use crate::client::timeline::recursive_bucket_visitor::{RecursiveBucketVisitor, RecursiveBucketVisitorCloseCallbackResult, RecursiveBucketVisitorOpenCallbackResult};
use crate::tools::buckets::{BucketLocation, BucketType};
use crate::tools::time::{DurationMillis, TimeMillis, MILLIS_IN_MONTH};
use crate::tools::types::Id;
use bytes::Bytes;
use std::collections::HashSet;
use std::sync::Arc;
use tokio::sync::RwLock;
pub struct SingleTimeline {
bucket_type: BucketType,
base_id: Id,
post_bundle_manager: Arc<dyn PostBundleManager>,
recent_posts_pen: Arc<RwLock<RecentPostsPen>>,
oldest_allowed_post_bundle_time_millis: TimeMillis,
oldest_processed_post_bundle_time_millis: TimeMillis,
post_ids_already_seen: HashSet<Id>,
}
impl SingleTimeline {
pub fn new(bucket_type: BucketType, location_id: &Id, post_bundle_manager: Arc<dyn PostBundleManager>, recent_posts_pen: Arc<RwLock<RecentPostsPen>>) -> Self {
Self {
bucket_type,
base_id: *location_id,
post_bundle_manager,
recent_posts_pen,
oldest_allowed_post_bundle_time_millis: TimeMillis::MAX,
oldest_processed_post_bundle_time_millis: TimeMillis::MAX,
post_ids_already_seen: HashSet::new(),
}
}
pub fn bucket_type(&self) -> BucketType {
self.bucket_type
}
pub fn base_id(&self) -> Id {
self.base_id
}
pub async fn get_more_posts(&mut self, time_millis: TimeMillis, max_posts: usize, bucket_durations: &[DurationMillis]) -> anyhow::Result<Vec<(BucketLocation, Bytes, bool)>> {
let mut encoded_posts = Vec::new();
if TimeMillis::MAX == self.oldest_allowed_post_bundle_time_millis {
self.oldest_allowed_post_bundle_time_millis = time_millis;
}
if TimeMillis::MAX == self.oldest_processed_post_bundle_time_millis {
self.oldest_processed_post_bundle_time_millis = time_millis;
}
let time_millis_max = time_millis;
let time_millis_min = self.oldest_allowed_post_bundle_time_millis - MILLIS_IN_MONTH;
RecursiveBucketVisitor::visit(
time_millis_max,
time_millis_min,
bucket_durations,
&mut async |bucket_time_millis: TimeMillis, bucket_duration_millis: DurationMillis| {
self.oldest_allowed_post_bundle_time_millis = self.oldest_allowed_post_bundle_time_millis.min(bucket_time_millis);
let bucket_location = BucketLocation::new(self.bucket_type, self.base_id, bucket_duration_millis, bucket_time_millis)?;
let encoded_post_bundle = self.post_bundle_manager.get_post_bundle(&bucket_location, time_millis).await?;
match encoded_post_bundle.header.overflowed {
true => Ok(RecursiveBucketVisitorOpenCallbackResult::ContinueWithChildren),
false => Ok(RecursiveBucketVisitorOpenCallbackResult::ContinueWithoutChildren),
}
},
&mut async |bucket_time_millis: TimeMillis, bucket_duration_millis: DurationMillis| {
self.oldest_processed_post_bundle_time_millis = self.oldest_processed_post_bundle_time_millis.min(bucket_time_millis);
let bucket_location = BucketLocation::new(self.bucket_type, self.base_id, bucket_duration_millis, bucket_time_millis)?;
let encoded_post_bundle = self.post_bundle_manager.get_post_bundle(&bucket_location, time_millis).await?;
let mut extraction_start_i = 0;
for i in 0..(encoded_post_bundle.header.num_posts as usize) {
let extraction_end_i = extraction_start_i + encoded_post_bundle.header.encoded_post_lengths[i];
let encoded_post_id = encoded_post_bundle.header.encoded_post_ids[i];
if self.post_ids_already_seen.insert(encoded_post_id) {
let encoded_post = encoded_post_bundle.encoded_posts_bytes.slice(extraction_start_i..extraction_end_i);
let healed = encoded_post_bundle.header.encoded_post_healed.contains(&encoded_post_id);
encoded_posts.push((bucket_location.clone(), encoded_post, healed));
}
extraction_start_i = extraction_end_i;
}
match encoded_posts.len() < max_posts {
true => Ok(RecursiveBucketVisitorCloseCallbackResult::Continue),
false => Ok(RecursiveBucketVisitorCloseCallbackResult::Stop),
}
},
)
.await?;
let pen_posts = self.recent_posts_pen.write().await.get_matching_posts(self.bucket_type, &self.base_id, &self.post_ids_already_seen, time_millis);
for (bucket_location, encoded_post_bytes, post_id) in pen_posts {
if self.post_ids_already_seen.insert(post_id) {
encoded_posts.push((bucket_location, encoded_post_bytes, false));
}
}
Ok(encoded_posts)
}
pub fn post_count(&self) -> usize {
self.post_ids_already_seen.len()
}
pub fn oldest_allowed_post_bundle_time_millis(&self) -> TimeMillis {
self.oldest_allowed_post_bundle_time_millis
}
pub fn oldest_processed_post_bundle_time_millis(&self) -> TimeMillis {
self.oldest_processed_post_bundle_time_millis
}
}
#[cfg(test)]
pub mod tests {
use crate::client::post_bundle::stub_post_bundle_manager::StubPostBundleManager;
use crate::client::timeline::recent_posts_pen::RecentPostsPen;
use crate::client::timeline::single_timeline::SingleTimeline;
use crate::tools::buckets::{BucketType, BUCKET_DURATIONS};
use crate::tools::time::{TimeMillis, MILLIS_IN_DAY, MILLIS_IN_MONTH, MILLIS_IN_WEEK};
use crate::tools::types::Id;
use log::info;
use std::sync::Arc;
use tokio::sync::RwLock;
fn empty_pen() -> Arc<RwLock<RecentPostsPen>> {
Arc::new(RwLock::new(RecentPostsPen::new()))
}
#[tokio::test]
async fn timeline_single_test() -> anyhow::Result<()> {
let id = Id::random();
let stub_post_bundle_manager = StubPostBundleManager::default();
{
stub_post_bundle_manager.add_random_stub_post_bundle(&id, MILLIS_IN_MONTH, "1D", 23)?;
}
let stub_post_bundle_manager = Arc::new(stub_post_bundle_manager);
let mut single_timeline = SingleTimeline::new(BucketType::User, &id, stub_post_bundle_manager, empty_pen());
let posts = single_timeline.get_more_posts(TimeMillis::from_epoch_offset_str("1M")?, 5, &BUCKET_DURATIONS[1..]).await?;
assert_eq!(posts.len(), 23);
assert_eq!(single_timeline.oldest_allowed_post_bundle_time_millis(), TimeMillis::from_epoch_offset_str("0")?);
assert_eq!(single_timeline.oldest_processed_post_bundle_time_millis(), TimeMillis::from_epoch_offset_str("0")?);
Ok(())
}
#[tokio::test]
async fn timeline_multiple_test() -> anyhow::Result<()> {
let id = Id::random();
let stub_post_bundle_manager = StubPostBundleManager::default();
{
stub_post_bundle_manager.add_random_stub_post_bundle(&id, MILLIS_IN_MONTH, "1D", 27)?;
stub_post_bundle_manager.add_random_stub_post_bundle(&id, MILLIS_IN_WEEK, "1W", 17)?;
stub_post_bundle_manager.add_random_stub_post_bundle(&id, MILLIS_IN_WEEK, "2W", 6)?;
stub_post_bundle_manager.add_random_stub_post_bundle(&id, MILLIS_IN_WEEK, "3W", 22)?;
}
let stub_post_bundle_manager = Arc::new(stub_post_bundle_manager);
let mut single_timeline = SingleTimeline::new(BucketType::User, &id, stub_post_bundle_manager, empty_pen());
info!("--- FETCH 1 -----------------------");
let posts = single_timeline.get_more_posts(TimeMillis::from_epoch_offset_str("1M")?, 20, &BUCKET_DURATIONS[1..4]).await?;
assert_eq!(posts.len(), 22);
assert_eq!(single_timeline.post_count(), 22);
assert_eq!(single_timeline.oldest_allowed_post_bundle_time_millis(), TimeMillis::from_epoch_offset_str("0")?);
assert_eq!(single_timeline.oldest_processed_post_bundle_time_millis(), TimeMillis::from_epoch_offset_str("3W")?);
info!("--- FETCH 2 -----------------------");
let posts = single_timeline.get_more_posts(TimeMillis::from_epoch_offset_str("1M")?, 20, &BUCKET_DURATIONS[1..4]).await?;
assert_eq!(posts.len(), 23);
assert_eq!(single_timeline.post_count(), 22 + 23);
assert_eq!(single_timeline.oldest_allowed_post_bundle_time_millis(), TimeMillis::from_epoch_offset_str("0")?);
assert_eq!(single_timeline.oldest_processed_post_bundle_time_millis(), TimeMillis::from_epoch_offset_str("1W")?);
info!("--- FETCH 3 -----------------------");
let posts = single_timeline.get_more_posts(TimeMillis::from_epoch_offset_str("1M")?, 20, &BUCKET_DURATIONS[1..4]).await?;
assert_eq!(posts.len(), 27);
assert_eq!(single_timeline.post_count(), 22 + 23 + 27);
assert_eq!(single_timeline.oldest_allowed_post_bundle_time_millis(), TimeMillis::from_epoch_offset_str("0")?);
assert_eq!(single_timeline.oldest_processed_post_bundle_time_millis(), TimeMillis::from_epoch_offset_str("0")?);
info!("--- FETCH 4 -----------------------");
let posts = single_timeline.get_more_posts(TimeMillis::from_epoch_offset_str("1M")?, 20, &BUCKET_DURATIONS[1..4]).await?;
assert_eq!(posts.len(), 0);
assert_eq!(single_timeline.post_count(), 22 + 23 + 27);
assert_eq!(single_timeline.oldest_allowed_post_bundle_time_millis(), TimeMillis::from_epoch_offset_str("-1M")?);
assert_eq!(single_timeline.oldest_processed_post_bundle_time_millis(), TimeMillis::from_epoch_offset_str("-1M")?);
Ok(())
}
#[tokio::test]
async fn timeline_multiple_months_test() -> anyhow::Result<()> {
let id = Id::random();
let stub_post_bundle_manager = StubPostBundleManager::default();
{
stub_post_bundle_manager.add_random_stub_post_bundle(&id, MILLIS_IN_MONTH, "1D", 23)?;
stub_post_bundle_manager.add_random_stub_post_bundle(&id, MILLIS_IN_WEEK, "2W", 5)?;
stub_post_bundle_manager.add_random_stub_post_bundle(&id, MILLIS_IN_WEEK, "3W", 16)?;
stub_post_bundle_manager.add_random_stub_post_bundle(&id, MILLIS_IN_MONTH, "1M1D", 29)?;
stub_post_bundle_manager.add_random_stub_post_bundle(&id, MILLIS_IN_WEEK, "1M2W", 21)?;
stub_post_bundle_manager.add_random_stub_post_bundle(&id, MILLIS_IN_DAY, "1M2W", 7)?;
stub_post_bundle_manager.add_random_stub_post_bundle(&id, MILLIS_IN_WEEK, "1M3W", 6)?;
}
let stub_post_bundle_manager = Arc::new(stub_post_bundle_manager);
let mut single_timeline = SingleTimeline::new(BucketType::User, &id, stub_post_bundle_manager, empty_pen());
info!("--- FETCH 1 -----------------------");
let posts = single_timeline.get_more_posts(TimeMillis::from_epoch_offset_str("2M")?, 20, &BUCKET_DURATIONS[1..4]).await?;
assert_eq!(posts.len(), 34);
assert_eq!(single_timeline.post_count(), 34);
assert_eq!(single_timeline.oldest_allowed_post_bundle_time_millis(), TimeMillis::from_epoch_offset_str("1M")?);
assert_eq!(single_timeline.oldest_processed_post_bundle_time_millis(), TimeMillis::from_epoch_offset_str("1M2W")?);
info!("--- FETCH 2 -----------------------");
let posts = single_timeline.get_more_posts(TimeMillis::from_epoch_offset_str("2M")?, 20, &BUCKET_DURATIONS[1..4]).await?;
assert_eq!(posts.len(), 29);
assert_eq!(single_timeline.post_count(), 34 + 29);
assert_eq!(single_timeline.oldest_allowed_post_bundle_time_millis(), TimeMillis::from_epoch_offset_str("1M")?);
assert_eq!(single_timeline.oldest_processed_post_bundle_time_millis(), TimeMillis::from_epoch_offset_str("1M")?);
info!("--- FETCH 3 -----------------------");
let posts = single_timeline.get_more_posts(TimeMillis::from_epoch_offset_str("2M")?, 20, &BUCKET_DURATIONS[1..4]).await?;
assert_eq!(posts.len(), 21);
assert_eq!(single_timeline.post_count(), 34 + 29 + 21);
assert_eq!(single_timeline.oldest_allowed_post_bundle_time_millis(), TimeMillis::from_epoch_offset_str("0")?);
assert_eq!(single_timeline.oldest_processed_post_bundle_time_millis(), TimeMillis::from_epoch_offset_str("2W")?);
info!("--- FETCH 4 -----------------------");
let posts = single_timeline.get_more_posts(TimeMillis::from_epoch_offset_str("2M")?, 20, &BUCKET_DURATIONS[1..4]).await?;
assert_eq!(posts.len(), 23);
assert_eq!(single_timeline.post_count(), 34 + 29 + 21 + 23);
assert_eq!(single_timeline.oldest_allowed_post_bundle_time_millis(), TimeMillis::from_epoch_offset_str("0")?);
assert_eq!(single_timeline.oldest_processed_post_bundle_time_millis(), TimeMillis::from_epoch_offset_str("0")?);
info!("--- FETCH 5 -----------------------");
let posts = single_timeline.get_more_posts(TimeMillis::from_epoch_offset_str("2M")?, 20, &BUCKET_DURATIONS[1..4]).await?;
assert_eq!(posts.len(), 0);
assert_eq!(single_timeline.post_count(), 34 + 29 + 21 + 23);
assert_eq!(single_timeline.oldest_allowed_post_bundle_time_millis(), TimeMillis::from_epoch_offset_str("-1M")?);
assert_eq!(single_timeline.oldest_processed_post_bundle_time_millis(), TimeMillis::from_epoch_offset_str("-1M")?);
info!("--- FETCH 6 -----------------------");
let posts = single_timeline.get_more_posts(TimeMillis::from_epoch_offset_str("2M")?, 20, &BUCKET_DURATIONS[1..4]).await?;
assert_eq!(posts.len(), 0);
assert_eq!(single_timeline.post_count(), 34 + 29 + 21 + 23);
assert_eq!(single_timeline.oldest_allowed_post_bundle_time_millis(), TimeMillis::from_epoch_offset_str("-2M")?);
assert_eq!(single_timeline.oldest_processed_post_bundle_time_millis(), TimeMillis::from_epoch_offset_str("-2M")?);
Ok(())
}
#[tokio::test]
async fn timeline_bundle_update_test() -> anyhow::Result<()> {
let id = Id::random();
let stub_post_bundle_manager = Arc::new(StubPostBundleManager::default());
let mut single_timeline = SingleTimeline::new(BucketType::User, &id, stub_post_bundle_manager.clone(), empty_pen());
info!("--- FETCH -----------------------");
let posts = single_timeline.get_more_posts(TimeMillis::from_epoch_offset_str("1M")?, 20, &BUCKET_DURATIONS[1..4]).await?;
assert_eq!(posts.len(), 0);
assert_eq!(single_timeline.post_count(), 0);
assert_eq!(single_timeline.oldest_allowed_post_bundle_time_millis(), TimeMillis::from_epoch_offset_str("0")?);
assert_eq!(single_timeline.oldest_processed_post_bundle_time_millis(), TimeMillis::from_epoch_offset_str("0")?);
let mut post_bundle = stub_post_bundle_manager.add_random_stub_post_bundle(&id, MILLIS_IN_MONTH, "1D", 12)?;
info!("--- FETCH -----------------------");
let posts = single_timeline.get_more_posts(TimeMillis::from_epoch_offset_str("1M")?, 20, &BUCKET_DURATIONS[1..4]).await?;
assert_eq!(posts.len(), 12);
assert_eq!(single_timeline.post_count(), 12);
assert_eq!(single_timeline.oldest_allowed_post_bundle_time_millis(), TimeMillis::from_epoch_offset_str("-1M")?);
assert_eq!(single_timeline.oldest_processed_post_bundle_time_millis(), TimeMillis::from_epoch_offset_str("-1M")?);
post_bundle.header.num_posts += 1;
post_bundle.header.encoded_post_ids.push(Id::random());
post_bundle.header.encoded_post_lengths.push(0);
stub_post_bundle_manager.add_stub_post_bundle(&id, MILLIS_IN_MONTH, "1D", &post_bundle)?;
info!("--- FETCH -----------------------");
let posts = single_timeline.get_more_posts(TimeMillis::from_epoch_offset_str("1M")?, 20, &BUCKET_DURATIONS[1..4]).await?;
assert_eq!(posts.len(), 1);
assert_eq!(single_timeline.post_count(), 13);
assert_eq!(single_timeline.oldest_allowed_post_bundle_time_millis(), TimeMillis::from_epoch_offset_str("-2M")?);
assert_eq!(single_timeline.oldest_processed_post_bundle_time_millis(), TimeMillis::from_epoch_offset_str("-2M")?);
info!("--- FETCH -----------------------");
let posts = single_timeline.get_more_posts(TimeMillis::from_epoch_offset_str("1M")?, 20, &BUCKET_DURATIONS[1..4]).await?;
assert_eq!(posts.len(), 0);
assert_eq!(single_timeline.post_count(), 13);
assert_eq!(single_timeline.oldest_allowed_post_bundle_time_millis(), TimeMillis::from_epoch_offset_str("-3M")?);
assert_eq!(single_timeline.oldest_processed_post_bundle_time_millis(), TimeMillis::from_epoch_offset_str("-3M")?);
Ok(())
}
#[tokio::test]
async fn timeline_healed_flag_is_per_post() -> anyhow::Result<()> {
let id = Id::random();
let stub_post_bundle_manager = Arc::new(StubPostBundleManager::default());
let mut post_bundle = stub_post_bundle_manager.add_random_stub_post_bundle(&id, MILLIS_IN_MONTH, "1D", 5)?;
let healed_post_id = post_bundle.header.encoded_post_ids[2];
post_bundle.header.encoded_post_healed.insert(healed_post_id);
stub_post_bundle_manager.add_stub_post_bundle(&id, MILLIS_IN_MONTH, "1D", &post_bundle)?;
let mut single_timeline = SingleTimeline::new(BucketType::User, &id, stub_post_bundle_manager, empty_pen());
let posts = single_timeline.get_more_posts(TimeMillis::from_epoch_offset_str("1M")?, 20, &BUCKET_DURATIONS[1..]).await?;
assert_eq!(posts.len(), 5);
let healed_count = posts.iter().filter(|(_, _, healed)| *healed).count();
assert_eq!(healed_count, 1, "exactly one post in this bundle is marked healed");
let unhealed_count = posts.iter().filter(|(_, _, healed)| !*healed).count();
assert_eq!(unhealed_count, 4);
Ok(())
}
}