use crate::record::{self, store::RecordStore, ProviderRecord, Record};
use futures::prelude::*;
use futures_timer::Delay;
use instant::Instant;
use libp2p_core::PeerId;
use std::collections::HashSet;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::time::Duration;
use std::vec;
pub const JOBS_MAX_QUERIES: usize = 100;
pub const JOBS_MAX_NEW_QUERIES: usize = 10;
#[derive(Debug)]
struct PeriodicJob<T> {
interval: Duration,
state: PeriodicJobState<T>,
}
impl<T> PeriodicJob<T> {
fn is_running(&self) -> bool {
match self.state {
PeriodicJobState::Running(..) => true,
PeriodicJobState::Waiting(..) => false,
}
}
fn asap(&mut self) {
if let PeriodicJobState::Waiting(delay, deadline) = &mut self.state {
let new_deadline = Instant::now() - Duration::from_secs(1);
*deadline = new_deadline;
delay.reset(Duration::from_secs(1));
}
}
fn check_ready(&mut self, cx: &mut Context<'_>, now: Instant) -> bool {
if let PeriodicJobState::Waiting(delay, deadline) = &mut self.state {
if now >= *deadline || !Future::poll(Pin::new(delay), cx).is_pending() {
return true;
}
}
false
}
}
#[derive(Debug)]
enum PeriodicJobState<T> {
Running(T),
Waiting(Delay, Instant),
}
pub struct PutRecordJob {
local_id: PeerId,
next_publish: Option<Instant>,
publish_interval: Option<Duration>,
record_ttl: Option<Duration>,
skipped: HashSet<record::Key>,
inner: PeriodicJob<vec::IntoIter<Record>>,
}
impl PutRecordJob {
pub fn new(
local_id: PeerId,
replicate_interval: Duration,
publish_interval: Option<Duration>,
record_ttl: Option<Duration>,
) -> Self {
let now = Instant::now();
let deadline = now + replicate_interval;
let delay = Delay::new(replicate_interval);
let next_publish = publish_interval.map(|i| now + i);
Self {
local_id,
next_publish,
publish_interval,
record_ttl,
skipped: HashSet::new(),
inner: PeriodicJob {
interval: replicate_interval,
state: PeriodicJobState::Waiting(delay, deadline),
},
}
}
pub fn skip(&mut self, key: record::Key) {
self.skipped.insert(key);
}
pub fn is_running(&self) -> bool {
self.inner.is_running()
}
pub fn asap(&mut self, publish: bool) {
if publish {
self.next_publish = Some(Instant::now() - Duration::from_secs(1))
}
self.inner.asap()
}
pub fn poll<T>(&mut self, cx: &mut Context<'_>, store: &mut T, now: Instant) -> Poll<Record>
where
for<'a> T: RecordStore<'a>,
{
if self.inner.check_ready(cx, now) {
let publish = self.next_publish.map_or(false, |t_pub| now >= t_pub);
let records = store
.records()
.filter_map(|r| {
let is_publisher = r.publisher.as_ref() == Some(&self.local_id);
if self.skipped.contains(&r.key) || (!publish && is_publisher) {
None
} else {
let mut record = r.into_owned();
if publish && is_publisher {
record.expires = record
.expires
.or_else(|| self.record_ttl.map(|ttl| now + ttl));
}
Some(record)
}
})
.collect::<Vec<_>>()
.into_iter();
if publish {
self.next_publish = self.publish_interval.map(|i| now + i);
}
self.skipped.clear();
self.inner.state = PeriodicJobState::Running(records);
}
if let PeriodicJobState::Running(records) = &mut self.inner.state {
for r in records {
if r.is_expired(now) {
store.remove(&r.key)
} else {
return Poll::Ready(r);
}
}
let deadline = now + self.inner.interval;
let delay = Delay::new(self.inner.interval);
self.inner.state = PeriodicJobState::Waiting(delay, deadline);
assert!(!self.inner.check_ready(cx, now));
}
Poll::Pending
}
}
pub struct AddProviderJob {
inner: PeriodicJob<vec::IntoIter<ProviderRecord>>,
}
impl AddProviderJob {
pub fn new(interval: Duration) -> Self {
let now = Instant::now();
Self {
inner: PeriodicJob {
interval,
state: {
let deadline = now + interval;
PeriodicJobState::Waiting(Delay::new(interval), deadline)
},
},
}
}
pub fn is_running(&self) -> bool {
self.inner.is_running()
}
pub fn asap(&mut self) {
self.inner.asap()
}
pub fn poll<T>(
&mut self,
cx: &mut Context<'_>,
store: &mut T,
now: Instant,
) -> Poll<ProviderRecord>
where
for<'a> T: RecordStore<'a>,
{
if self.inner.check_ready(cx, now) {
let records = store
.provided()
.map(|r| r.into_owned())
.collect::<Vec<_>>()
.into_iter();
self.inner.state = PeriodicJobState::Running(records);
}
if let PeriodicJobState::Running(keys) = &mut self.inner.state {
for r in keys {
if r.is_expired(now) {
store.remove_provider(&r.key, &r.provider)
} else {
return Poll::Ready(r);
}
}
let deadline = now + self.inner.interval;
let delay = Delay::new(self.inner.interval);
self.inner.state = PeriodicJobState::Waiting(delay, deadline);
assert!(!self.inner.check_ready(cx, now));
}
Poll::Pending
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::record::store::MemoryStore;
use futures::{executor::block_on, future::poll_fn};
use quickcheck::*;
use rand::Rng;
fn rand_put_record_job() -> PutRecordJob {
let mut rng = rand::thread_rng();
let id = PeerId::random();
let replicate_interval = Duration::from_secs(rng.gen_range(1..60));
let publish_interval = Some(replicate_interval * rng.gen_range(1..10));
let record_ttl = Some(Duration::from_secs(rng.gen_range(1..600)));
PutRecordJob::new(id, replicate_interval, publish_interval, record_ttl)
}
fn rand_add_provider_job() -> AddProviderJob {
let mut rng = rand::thread_rng();
let interval = Duration::from_secs(rng.gen_range(1..60));
AddProviderJob::new(interval)
}
#[test]
fn new_job_not_running() {
let job = rand_put_record_job();
assert!(!job.is_running());
let job = rand_add_provider_job();
assert!(!job.is_running());
}
#[test]
fn run_put_record_job() {
fn prop(records: Vec<Record>) {
let mut job = rand_put_record_job();
let mut store = MemoryStore::new(job.local_id);
for r in records {
let _ = store.put(r);
}
block_on(poll_fn(|ctx| {
let now = Instant::now() + job.inner.interval;
for r in store.records().map(|r| r.into_owned()).collect::<Vec<_>>() {
if !r.is_expired(now) {
assert_eq!(job.poll(ctx, &mut store, now), Poll::Ready(r));
assert!(job.is_running());
}
}
assert_eq!(job.poll(ctx, &mut store, now), Poll::Pending);
assert!(!job.is_running());
Poll::Ready(())
}));
}
quickcheck(prop as fn(_))
}
#[test]
fn run_add_provider_job() {
fn prop(records: Vec<ProviderRecord>) {
let mut job = rand_add_provider_job();
let id = PeerId::random();
let mut store = MemoryStore::new(id);
for mut r in records {
r.provider = id;
let _ = store.add_provider(r);
}
block_on(poll_fn(|ctx| {
let now = Instant::now() + job.inner.interval;
for r in store.provided().map(|r| r.into_owned()).collect::<Vec<_>>() {
if !r.is_expired(now) {
assert_eq!(job.poll(ctx, &mut store, now), Poll::Ready(r));
assert!(job.is_running());
}
}
assert_eq!(job.poll(ctx, &mut store, now), Poll::Pending);
assert!(!job.is_running());
Poll::Ready(())
}));
}
quickcheck(prop as fn(_))
}
}