use std::path::PathBuf;
use std::time::Duration;
use std::time::SystemTime;
use anyhow::Result;
use common::util;
use model::Model;
use slog::error;
use crate::DataFrame;
use crate::Direction;
use crate::LocalStore;
use crate::RemoteStore;
use crate::Store;
struct SamplePackage<SampleType> {
older_sample: Option<SampleType>,
newer_sample: SampleType,
timestamp: SystemTime,
duration: Duration,
}
impl<SampleType> SamplePackage<SampleType> {
fn new(
older_sample: Option<SampleType>,
older_timestamp: SystemTime,
newer_sample: SampleType,
newer_timestamp: SystemTime,
) -> Self {
Self {
older_sample,
newer_sample,
timestamp: newer_timestamp,
duration: newer_timestamp
.duration_since(older_timestamp)
.expect("time went backwards"),
}
}
}
impl SamplePackage<DataFrame> {
pub fn to_model(&self) -> Model {
if let Some(older_sample) = self.older_sample.as_ref() {
Model::new(
self.timestamp,
&self.newer_sample.sample,
Some((&older_sample.sample, self.duration)),
)
} else {
Model::new(self.timestamp, &self.newer_sample.sample, None)
}
}
}
trait ModelStore: Store {
type ModelType;
fn to_model(&self, sample_package: &SamplePackage<Self::SampleType>)
-> Option<Self::ModelType>;
fn extract_sample_and_log(
&mut self,
timestamp: SystemTime,
direction: Direction,
logger: &slog::Logger,
) -> Option<(SystemTime, Self::SampleType)> {
match self.get_sample_at_timestamp(timestamp, direction) {
Ok(None) => None,
Ok(val) => val,
Err(e) => {
error!(logger, "{:#}", e.context("Failed to load from store"));
None
}
}
}
fn get_adjacent_sample_at_timestamp(
&mut self,
timestamp: SystemTime,
direction: Direction,
logger: &slog::Logger,
) -> Option<SamplePackage<Self::SampleType>> {
let (target_ts, target_sample) =
self.extract_sample_and_log(timestamp, direction, logger)?;
let mut res_package = SamplePackage {
older_sample: None,
newer_sample: target_sample,
timestamp: target_ts,
duration: Duration::from_secs(0),
};
if let Some((older_ts, older_sample)) = self.extract_sample_and_log(
res_package.timestamp - Duration::from_secs(1),
Direction::Reverse,
logger,
) {
res_package.older_sample = Some(older_sample);
res_package.duration = res_package
.timestamp
.duration_since(older_ts)
.expect("time went backwards");
}
Some(res_package)
}
}
impl ModelStore for LocalStore {
type ModelType = Model;
fn to_model(&self, sample_package: &SamplePackage<DataFrame>) -> Option<Model> {
Some(sample_package.to_model())
}
}
impl ModelStore for RemoteStore {
type ModelType = Model;
fn to_model(&self, sample_package: &SamplePackage<DataFrame>) -> Option<Model> {
Some(sample_package.to_model())
}
}
pub struct Advance<FrameType, MType> {
logger: slog::Logger,
store: Box<dyn ModelStore<SampleType = FrameType, ModelType = MType>>,
cached_sample: Option<FrameType>,
target_timestamp: SystemTime,
current_direction: Direction,
}
impl<FrameType, ModelType> Advance<FrameType, ModelType> {
pub fn initialize(&mut self) {
assert!(self.cached_sample.is_none());
if let Some((timestamp, sample)) = self.store.extract_sample_and_log(
self.target_timestamp,
Direction::Forward,
&self.logger,
) {
self.cached_sample = Some(sample);
self.target_timestamp = timestamp;
}
}
pub fn advance(&mut self, direction: Direction) -> Option<ModelType> {
let target_timestamp = match direction {
Direction::Forward => self.target_timestamp + Duration::from_secs(1),
Direction::Reverse => self.target_timestamp - Duration::from_secs(1),
};
let (next_timestamp, next_sample) =
self.store
.extract_sample_and_log(target_timestamp, direction, &self.logger)?;
if direction != self.current_direction {
self.current_direction = direction;
self.cached_sample = Some(next_sample);
self.target_timestamp = next_timestamp;
return self.advance(direction);
}
match direction {
Direction::Forward => {
let sample_package = SamplePackage::<FrameType>::new(
self.cached_sample.take(),
self.target_timestamp,
next_sample,
next_timestamp,
);
let model = self.store.to_model(&sample_package);
self.cached_sample = Some(sample_package.newer_sample);
self.target_timestamp = next_timestamp;
model
}
Direction::Reverse => {
let sample_package = SamplePackage::<FrameType>::new(
Some(next_sample),
next_timestamp,
self.cached_sample.take().expect(
"No cached sample avaialbe, the Advance module may not be initialized",
),
self.target_timestamp,
);
let model = self.store.to_model(&sample_package);
self.cached_sample = sample_package.older_sample;
self.target_timestamp = next_timestamp;
model
}
}
}
pub fn jump_sample_to(&mut self, timestamp: SystemTime) -> Option<ModelType> {
let mut sample_package = self.store.get_adjacent_sample_at_timestamp(
timestamp,
Direction::Forward,
&self.logger,
);
if sample_package.is_none() {
sample_package = self.store.get_adjacent_sample_at_timestamp(
timestamp,
Direction::Reverse,
&self.logger,
);
}
let sample_package = sample_package?;
let model = self.store.to_model(&sample_package);
self.current_direction = Direction::Forward;
self.cached_sample = Some(sample_package.newer_sample);
self.target_timestamp = sample_package.timestamp;
model
}
pub fn get_latest_sample(&mut self) -> Option<ModelType> {
self.jump_sample_to(SystemTime::now())
}
pub fn jump_sample_forward(&mut self, duration: humantime::Duration) -> Option<ModelType> {
self.jump_sample_to(self.target_timestamp + Duration::from_secs(duration.as_secs()))
}
pub fn jump_sample_backward(&mut self, duration: humantime::Duration) -> Option<ModelType> {
let gap = Duration::from_secs(duration.as_secs());
if util::get_unix_timestamp(self.target_timestamp) < gap.as_secs() {
return None;
}
self.jump_sample_to(self.target_timestamp - gap)
}
pub fn get_next_ts(&self) -> SystemTime {
if self.cached_sample.is_none() {
return self.target_timestamp;
}
match self.current_direction {
Direction::Forward => self.target_timestamp + Duration::from_secs(1),
Direction::Reverse => self.target_timestamp - Duration::from_secs(1),
}
}
}
pub fn new_advance_local(
logger: slog::Logger,
store_dir: PathBuf,
timestamp: SystemTime,
) -> Advance<DataFrame, Model> {
let store = Box::new(LocalStore::new(logger.clone(), store_dir));
Advance {
logger,
store,
cached_sample: None,
target_timestamp: timestamp,
current_direction: Direction::Forward,
}
}
pub fn new_advance_remote(
logger: slog::Logger,
host: String,
port: Option<u16>,
timestamp: SystemTime,
) -> Result<Advance<DataFrame, Model>> {
let store = Box::new(RemoteStore {
store: crate::remote_store::RemoteStore::new(host, port)?,
});
Ok(Advance {
logger,
store,
cached_sample: None,
target_timestamp: timestamp,
current_direction: Direction::Forward,
})
}
#[cfg(test)]
mod tests {
use anyhow::bail;
use super::*;
fn get_logger() -> slog::Logger {
slog::Logger::root(slog::Discard, slog::o!())
}
struct FakeStore {
sample: Vec<u64>,
raise_error: bool,
}
impl FakeStore {
fn new() -> Self {
let mut sample = vec![3, 10, 20, 50];
sample.sort_unstable();
Self {
sample,
raise_error: false,
}
}
fn raise_error(&mut self) {
self.raise_error = true;
}
}
impl Store for FakeStore {
type SampleType = u64;
fn get_sample_at_timestamp(
&mut self,
timestamp: SystemTime,
direction: Direction,
) -> Result<Option<(SystemTime, Self::SampleType)>> {
if self.raise_error {
bail!("error");
}
let timestamp = util::get_unix_timestamp(timestamp);
if self.sample.is_empty()
|| (timestamp < *self.sample.first().unwrap() && direction == Direction::Reverse)
|| (timestamp > *self.sample.last().unwrap() && direction == Direction::Forward)
{
return Ok(None);
}
match self.sample.binary_search(×tamp) {
Ok(_) => Ok(Some((util::get_system_time(timestamp), timestamp))),
Err(idx) => match direction {
Direction::Reverse => Ok(Some((
util::get_system_time(self.sample[idx - 1]),
self.sample[idx - 1],
))),
Direction::Forward => Ok(Some((
util::get_system_time(self.sample[idx]),
self.sample[idx],
))),
},
}
}
}
impl ModelStore for FakeStore {
type ModelType = String;
fn to_model(&self, sample_package: &SamplePackage<u64>) -> Option<String> {
if let Some(older_sample) = sample_package.older_sample.as_ref() {
Some(format!(
"{}_{}_{}_{}",
older_sample,
sample_package.newer_sample,
util::get_unix_timestamp(sample_package.timestamp),
sample_package.duration.as_secs()
))
} else {
Some(format!(
"{}_{}",
sample_package.newer_sample,
util::get_unix_timestamp(sample_package.timestamp)
))
}
}
}
fn get_advance_with_fake_store(timestamp: u64) -> Advance<u64, String> {
Advance::<u64, String> {
logger: get_logger(),
store: Box::new(FakeStore::new()),
cached_sample: None,
target_timestamp: util::get_system_time(timestamp),
current_direction: Direction::Forward,
}
}
#[test]
fn store_operation_test_with_fake_store() {
let mut store = FakeStore::new();
macro_rules! check_sample {
($query:tt, $expected:tt, $direction:expr) => {
let timestamp = util::get_system_time($query);
let res = store.get_sample_at_timestamp(timestamp, $direction);
assert_eq!(
res.expect("Fail to get sample."),
Some((util::get_system_time($expected), $expected))
);
};
($query:tt, $direction:expr) => {
let timestamp = util::get_system_time($query);
let res = store.get_sample_at_timestamp(timestamp, $direction);
assert_eq!(res.expect("Fail to get sample."), None);
};
}
check_sample!(20 , 20 , Direction::Forward);
check_sample!(20 , 20 , Direction::Reverse);
check_sample!(0 , 3 , Direction::Forward);
check_sample!(0 , Direction::Reverse);
check_sample!(60 , Direction::Forward);
check_sample!(60 , 50 , Direction::Reverse);
check_sample!(30 , 50 , Direction::Forward);
check_sample!(30 , 20 , Direction::Reverse);
store.raise_error();
let res = store.get_sample_at_timestamp(util::get_system_time(0), Direction::Forward);
assert!(res.is_err());
}
#[test]
fn store_operation_test_get_adjacent_sample_at_timestamp() {
let mut store = FakeStore::new();
macro_rules! check_sample {
($query:tt, $direction:expr, $expected_sample:expr) => {
let timestamp = util::get_system_time($query);
let res =
store.get_adjacent_sample_at_timestamp(timestamp, $direction, &get_logger());
assert_eq!(
store
.to_model(&res.expect("Failed to get sample"))
.expect("Failed to convert sample to model"),
$expected_sample
);
};
($query:tt, $direction:expr) => {
let timestamp = util::get_system_time($query);
let res =
store.get_adjacent_sample_at_timestamp(timestamp, $direction, &get_logger());
assert!(res.is_none());
};
}
for direction in [Direction::Forward, Direction::Reverse] {
check_sample!(
10,
direction,
"3_10_10_7"
);
}
check_sample!(
7,
Direction::Forward,
"3_10_10_7"
);
check_sample!(
7,
Direction::Reverse,
"3_3"
);
check_sample!(
12,
Direction::Reverse,
"3_10_10_7"
);
check_sample!(
0,
Direction::Forward,
"3_3"
);
check_sample!(0 , Direction::Reverse);
check_sample!(
60,
Direction::Reverse,
"20_50_50_30"
);
check_sample!(60 , Direction::Forward);
}
#[test]
fn advance_test_initialize() {
macro_rules! check_advance {
($init_time:tt, $expected:expr) => {
let mut advance = get_advance_with_fake_store($init_time);
advance.initialize();
assert_eq!(advance.cached_sample, $expected);
if advance.cached_sample.is_some() {
assert_eq!(
advance.target_timestamp,
util::get_system_time($expected.expect("Didn't init"))
);
} else {
assert_eq!(advance.target_timestamp, util::get_system_time($init_time));
}
};
}
check_advance!(10 , Some(10) );
check_advance!(4 , Some(10) );
check_advance!(2 , Some(3) );
check_advance!(60 , None );
}
macro_rules! advance {
($adv:expr, $direction:expr, $expected_cache:expr, $model:expr) => {
let res = $adv.advance($direction);
assert_eq!(res, $model);
assert_eq!($adv.cached_sample, Some($expected_cache));
assert_eq!(
$adv.target_timestamp,
util::get_system_time($expected_cache)
);
assert_eq!($adv.current_direction, $direction);
};
}
#[test]
fn advance_test_advance_continous_move() {
let mut advance = get_advance_with_fake_store(3);
advance.initialize();
for (old, new) in [(3, 10), (10, 20), (20, 50)] {
advance!(
advance,
Direction::Forward,
new,
Some(format!("{}_{}_{}_{}", old, new, new, new - old))
);
}
for _ in 0..5 {
advance!(
advance,
Direction::Forward,
50,
None
);
}
for (old, new) in [(10, 20), (3, 10)] {
advance!(
advance,
Direction::Reverse,
old,
Some(format!("{}_{}_{}_{}", old, new, new, new - old))
);
}
for _ in 0..5 {
advance!(advance, Direction::Reverse, 3 , None);
}
}
#[test]
fn advance_test_advance_direction_change() {
let mut advance = get_advance_with_fake_store(10);
advance.initialize();
advance!(
advance,
Direction::Forward,
20,
Some("10_20_20_10".into())
);
advance!(
advance,
Direction::Reverse,
3,
Some("3_10_10_7".into())
);
let mut advance = get_advance_with_fake_store(10);
advance.initialize();
advance!(
advance,
Direction::Reverse,
3,
None
);
}
#[test]
fn advance_test_jump_sample_to() {
let mut advance = get_advance_with_fake_store(3);
advance.initialize();
macro_rules! check_jump {
($query:tt, $expected_cache:expr, $expected_sample:expr) => {
let timestamp = util::get_system_time($query);
let res = advance.jump_sample_to(timestamp);
assert_eq!(res.expect("Failed to get sample"), $expected_sample);
assert_eq!(advance.current_direction, Direction::Forward);
assert_eq!(
advance.target_timestamp,
util::get_system_time($expected_cache)
);
assert_eq!(advance.cached_sample, Some($expected_cache));
};
}
check_jump!(
20,
20,
"10_20_20_10"
);
check_jump!(
15,
20,
"10_20_20_10"
);
check_jump!(
60,
50,
"20_50_50_30"
);
check_jump!(
1,
3,
"3_3"
);
}
#[test]
fn advance_test_jump_util() {
let mut advance = get_advance_with_fake_store(3);
advance.initialize();
assert_eq!(
advance
.jump_sample_forward(Duration::from_secs(10).into())
.expect("Failed to jump sample forward"),
"10_20_20_10"
);
assert_eq!(
advance
.jump_sample_backward(Duration::from_secs(10).into())
.expect("Failed to jump sample backward"),
"3_10_10_7"
);
assert_eq!(
advance
.get_latest_sample()
.expect("Failed to get lastest sample"),
"20_50_50_30"
);
}
#[test]
fn advance_test_get_next_ts() {
let mut advance = get_advance_with_fake_store(3);
assert_eq!(advance.get_next_ts(), util::get_system_time(3));
advance.initialize();
assert_eq!(advance.get_next_ts(), util::get_system_time(4));
advance.advance(Direction::Forward);
advance.advance(Direction::Reverse);
assert_eq!(advance.get_next_ts(), util::get_system_time(2));
}
}