use crate::client::post_bundle::post_bundle_manager::PostBundleManager;
use crate::client::timeline::recent_posts_pen::RecentPostsPen;
use crate::client::timeline::single_timeline::SingleTimeline;
use crate::tools::time::{DurationMillis, MILLIS_IN_HOUR, TimeMillis};
use crate::tools::types::Id;
use log::trace;
use std::sync::Arc;
use bytes::Bytes;
use tokio::sync::RwLock;
use crate::tools::buckets::{BucketLocation, BucketType};
pub struct MultipleTimeline {
bucket_type: BucketType,
base_ids: Vec<Id>,
single_timelines: Vec<SingleTimeline>,
}
impl MultipleTimeline {
pub fn new(bucket_type: BucketType, base_ids: Vec<Id>, post_bundle_manager: Arc<dyn PostBundleManager>, recent_posts_pen: Arc<RwLock<RecentPostsPen>>) -> Self {
let single_timelines = base_ids.iter().map(|base_id| SingleTimeline::new(bucket_type, base_id, post_bundle_manager.clone(), recent_posts_pen.clone())).collect();
Self { bucket_type, base_ids, single_timelines }
}
pub fn base_ids(&self) -> &Vec<Id> {
&self.base_ids
}
pub fn bucket_type(&self) -> BucketType {
self.bucket_type
}
pub async fn get_more_posts(&mut self, time_millis: TimeMillis, max_posts: usize, max_posts_per_single_timeline: usize, bucket_durations: &[DurationMillis]) -> anyhow::Result<Vec<(BucketLocation, Bytes, bool)>> {
let mut encoded_posts_aggregated = Vec::new();
self.single_timelines.sort_by_key(|single_timeline| {
let date_penalty = time_millis - single_timeline.oldest_processed_post_bundle_time_millis();
let post_penalty = MILLIS_IN_HOUR.const_mul(single_timeline.post_count() as i64);
date_penalty + post_penalty
});
for single_timeline in &mut self.single_timelines {
trace!("Fetching posts from SingleTimeline with base_id={}", single_timeline.base_id());
let posts = single_timeline.get_more_posts(time_millis, max_posts_per_single_timeline, bucket_durations).await?;
encoded_posts_aggregated.extend(posts);
if encoded_posts_aggregated.len() >= max_posts {
break;
}
}
Ok(encoded_posts_aggregated)
}
pub fn oldest_processed_post_bundle_time_millis(&self) -> TimeMillis {
self.single_timelines
.iter()
.map(|single_timeline| single_timeline.oldest_processed_post_bundle_time_millis())
.min()
.unwrap_or(TimeMillis::MAX)
}
}
#[cfg(test)]
pub mod tests {
use crate::client::post_bundle::stub_post_bundle_manager::StubPostBundleManager;
use crate::client::timeline::multiple_timeline::MultipleTimeline;
use crate::client::timeline::recent_posts_pen::RecentPostsPen;
use crate::tools::buckets::{BucketType, BUCKET_DURATIONS};
use crate::tools::time::{MILLIS_IN_MONTH, MILLIS_IN_WEEK, TimeMillis};
use crate::tools::types::Id;
use log::info;
use std::sync::Arc;
use tokio::sync::RwLock;
fn empty_pen() -> Arc<RwLock<RecentPostsPen>> {
Arc::new(RwLock::new(RecentPostsPen::new()))
}
#[tokio::test]
async fn timeline_single_test() -> anyhow::Result<()> {
let ids = vec![Id::from_slice(&[0x11; 32])?, Id::from_slice(&[0x22; 32])?, Id::from_slice(&[0x33; 32])?];
let stub_post_bundle_manager = Arc::new(StubPostBundleManager::default());
{
stub_post_bundle_manager.add_random_stub_post_bundle(&ids[0], MILLIS_IN_MONTH, "5M", 29)?; stub_post_bundle_manager.add_random_stub_post_bundle(&ids[1], MILLIS_IN_MONTH, "5M", 23)?; stub_post_bundle_manager.add_random_stub_post_bundle(&ids[2], MILLIS_IN_MONTH, "5M", 27)?;
stub_post_bundle_manager.add_random_stub_post_bundle(&ids[0], MILLIS_IN_MONTH, "4M", 25)?; stub_post_bundle_manager.add_random_stub_post_bundle(&ids[1], MILLIS_IN_MONTH, "4M", 21)?; stub_post_bundle_manager.add_random_stub_post_bundle(&ids[2], MILLIS_IN_MONTH, "4M", 28)?;
stub_post_bundle_manager.add_random_stub_post_bundle(&ids[0], MILLIS_IN_MONTH, "3M", 6)?; stub_post_bundle_manager.add_random_stub_post_bundle(&ids[1], MILLIS_IN_MONTH, "3M", 9)?; stub_post_bundle_manager.add_random_stub_post_bundle(&ids[2], MILLIS_IN_MONTH, "3M", 7)?;
stub_post_bundle_manager.add_random_stub_post_bundle(&ids[0], MILLIS_IN_MONTH, "2M", 35)?; stub_post_bundle_manager.add_random_stub_post_bundle(&ids[1], MILLIS_IN_MONTH, "2M", 9)?; stub_post_bundle_manager.add_random_stub_post_bundle(&ids[2], MILLIS_IN_MONTH, "2M", 11)?; stub_post_bundle_manager.add_random_stub_post_bundle(&ids[0], MILLIS_IN_WEEK, "2M1W", 8)?;
stub_post_bundle_manager.add_random_stub_post_bundle(&ids[2], MILLIS_IN_MONTH, "1M", 3)?; }
let mut multiple_timeline = MultipleTimeline::new(BucketType::User, ids, stub_post_bundle_manager, empty_pen());
let bucket_durations_starting_at_monthly = &BUCKET_DURATIONS[1..];
{
info!("1)--- FETCH -----------------------");
let posts = multiple_timeline.get_more_posts(TimeMillis::from_epoch_offset_str("6M")?, 20, 5, bucket_durations_starting_at_monthly).await?;
assert_eq!(29, posts.len());
info!("--- FETCH -----------------------");
let posts = multiple_timeline.get_more_posts(TimeMillis::from_epoch_offset_str("6M")?, 20, 5, bucket_durations_starting_at_monthly).await?;
assert_eq!(23, posts.len());
info!("--- FETCH -----------------------");
let posts = multiple_timeline.get_more_posts(TimeMillis::from_epoch_offset_str("6M")?, 20, 5, bucket_durations_starting_at_monthly).await?;
assert_eq!(27, posts.len());
}
{
info!("2)--- FETCH -----------------------");
let posts = multiple_timeline.get_more_posts(TimeMillis::from_epoch_offset_str("6M")?, 20, 5, bucket_durations_starting_at_monthly).await?;
assert_eq!(21, posts.len());
info!("--- FETCH -----------------------");
let posts = multiple_timeline.get_more_posts(TimeMillis::from_epoch_offset_str("6M")?, 20, 5, bucket_durations_starting_at_monthly).await?;
assert_eq!(28, posts.len());
info!("--- FETCH -----------------------");
let posts = multiple_timeline.get_more_posts(TimeMillis::from_epoch_offset_str("6M")?, 20, 5, bucket_durations_starting_at_monthly).await?;
assert_eq!(25, posts.len());
}
{
info!("3)--- FETCH -----------------------");
let posts = multiple_timeline.get_more_posts(TimeMillis::from_epoch_offset_str("6M")?, 20, 5, bucket_durations_starting_at_monthly).await?;
assert_eq!(9 + 6 + 7, posts.len());
}
{
info!("4)--- FETCH -----------------------");
let posts = multiple_timeline.get_more_posts(TimeMillis::from_epoch_offset_str("6M")?, 20, 5, bucket_durations_starting_at_monthly).await?;
assert_eq!(28, posts.len());
}
{
info!("5) --- FETCH -----------------------");
let posts = multiple_timeline.get_more_posts(TimeMillis::from_epoch_offset_str("6M")?, 20, 5, bucket_durations_starting_at_monthly).await?;
assert_eq!(35, posts.len());
info!("6) --- FETCH -----------------------");
let posts = multiple_timeline.get_more_posts(TimeMillis::from_epoch_offset_str("6M")?, 20, 5, bucket_durations_starting_at_monthly).await?;
assert_eq!(3, posts.len());
info!("--- FETCH -----------------------");
let posts = multiple_timeline.get_more_posts(TimeMillis::from_epoch_offset_str("6M")?, 20, 5, bucket_durations_starting_at_monthly).await?;
assert_eq!(0, posts.len());
}
Ok(())
}
#[tokio::test]
async fn oldest_processed_post_bundle_time_millis_is_min_of_single_timelines() -> anyhow::Result<()> {
let ids = vec![Id::from_slice(&[0x11; 32])?, Id::from_slice(&[0x22; 32])?];
let stub_post_bundle_manager = Arc::new(StubPostBundleManager::default());
stub_post_bundle_manager.add_random_stub_post_bundle(&ids[0], MILLIS_IN_MONTH, "3M", 5)?;
stub_post_bundle_manager.add_random_stub_post_bundle(&ids[1], MILLIS_IN_MONTH, "2M", 5)?;
let mut multiple_timeline = MultipleTimeline::new(BucketType::User, ids, stub_post_bundle_manager, empty_pen());
let bucket_durations_starting_at_monthly = &BUCKET_DURATIONS[1..];
assert_eq!(multiple_timeline.oldest_processed_post_bundle_time_millis(), TimeMillis::MAX);
let _posts = multiple_timeline.get_more_posts(TimeMillis::from_epoch_offset_str("4M")?, 20, 5, bucket_durations_starting_at_monthly).await?;
let oldest_after_first_fetch = multiple_timeline.oldest_processed_post_bundle_time_millis();
let _posts = multiple_timeline.get_more_posts(TimeMillis::from_epoch_offset_str("4M")?, 20, 5, bucket_durations_starting_at_monthly).await?;
let oldest_after_second_fetch = multiple_timeline.oldest_processed_post_bundle_time_millis();
assert!(oldest_after_second_fetch <= oldest_after_first_fetch);
Ok(())
}
}