use crate::{
config::{RecordConfig, RotateDirective},
logfile::{BatchItem, Id, BATCH_POOL},
logfile_collection::ArchiveCollectionWriter,
recorder::{BCastMsg, ShardId, Shards},
};
use ahash::{AHashMap, AHashSet};
use anyhow::{Context, Result};
use arcstr::ArcStr;
use chrono::prelude::*;
use futures::{
channel::{mpsc, oneshot},
future::{self, Fuse},
prelude::*,
select_biased,
};
use log::{error, info, warn};
use netidx::{
path::Path,
protocol::glob::{Glob, GlobSet},
resolver_client::{ChangeTracker, ResolverRead},
subscriber::{Dval, Event, SubId, Subscriber, UpdatesFlags},
utils::{self, Batched},
};
use nohash::IntMap;
use poolshark::global::GPooled;
use std::{collections::BTreeMap, ops::Bound, sync::Arc, time::Duration};
use tokio::{
task,
time::{self, Instant},
};
#[derive(Debug)]
struct CTS(BTreeMap<Path, ChangeTracker>);
impl CTS {
fn new(globs: &Vec<Glob>) -> CTS {
let mut btm = BTreeMap::new();
for glob in globs {
let base = glob.base();
match btm
.range::<str, (Bound<&str>, Bound<&str>)>((
Bound::Unbounded,
Bound::Excluded(base),
))
.next_back()
{
Some((p, _)) if Path::is_parent(p, base) => (),
None | Some(_) => {
let base = Path::from(ArcStr::from(base));
let ct = ChangeTracker::new(base.clone());
btm.insert(base, ct);
}
}
}
CTS(btm)
}
async fn changed(&mut self, r: &ResolverRead) -> Result<bool> {
let res =
future::join_all(self.0.iter_mut().map(|(_, ct)| r.check_changed(ct))).await;
for r in res {
if r.context("checking for resolver changes")? {
return Ok(true);
}
}
Ok(false)
}
}
async fn maybe_interval(poll: &mut Option<time::Interval>) {
match poll {
None => future::pending().await,
Some(poll) => {
poll.tick().await;
}
}
}
type Lst = Option<GPooled<Vec<GPooled<Vec<Path>>>>>;
async fn list_task(
interval: Duration,
mut rx: mpsc::UnboundedReceiver<oneshot::Sender<Lst>>,
resolver: ResolverRead,
spec: GlobSet,
) -> Result<()> {
use rand::{rng, RngExt};
let mut cts = CTS::new(&spec);
let max_jitter = interval.as_secs_f64() * 0.1;
while let Some(reply) = rx.next().await {
let wait = rng().random_range(0. ..max_jitter);
time::sleep(Duration::from_secs_f64(wait)).await;
match cts.changed(&resolver).await {
Ok(true) => match resolver.list_matching(&spec).await {
Ok(lst) => {
let _ = reply.send(Some(lst));
}
Err(e) => {
warn!("list_task: list_matching failed {}, will retry", e);
let _ = reply.send(None);
}
},
Ok(false) => {
let _ = reply.send(None);
}
Err(e) => {
warn!("list_task: check_changed failed {}, will retry", e);
let _ = reply.send(None);
}
}
}
Ok(())
}
fn start_list_task(
poll_interval: Duration,
rx: mpsc::UnboundedReceiver<oneshot::Sender<Lst>>,
resolver: ResolverRead,
spec: GlobSet,
) {
task::spawn(async move {
let r = list_task(poll_interval, rx, resolver, spec).await;
match r {
Err(e) => error!("list task exited with error {}", e),
Ok(()) => info!("list task exited"),
}
});
}
async fn wait_list(pending: &mut Option<Fuse<oneshot::Receiver<Lst>>>) -> Lst {
match pending {
None => future::pending().await,
Some(r) => match r.await {
Ok(r) => r,
Err(_) => None,
},
}
}
fn write_pathmap(
archive: &mut ArchiveCollectionWriter,
to_add: &mut Vec<(Path, SubId)>,
by_subid: &mut IntMap<SubId, Id>,
) -> Result<()> {
task::block_in_place(|| {
let i = to_add.iter().map(|(p, _)| p);
archive.add_paths(i)
})
.context("adding paths to pathindex")?;
for (path, subid) in to_add.drain(..) {
if !by_subid.contains_key(&subid) {
let id = archive.id_for_path(&path).unwrap();
by_subid.insert(subid, id);
}
}
archive.flush_pathindex().context("flushing pathindex")?;
Ok(())
}
fn write_image(
archive: &mut ArchiveCollectionWriter,
by_subid: &IntMap<SubId, Id>,
image: &IntMap<SubId, Event>,
ts: DateTime<Utc>,
) -> Result<()> {
let mut b = BATCH_POOL.take();
for (id, ev) in image.iter() {
if let Some(id) = by_subid.get(id) {
b.push(BatchItem(*id, ev.clone()));
}
}
archive.add_batch(true, ts, &b).context("adding image batch")?;
Ok(())
}
pub(super) async fn run(
shards: Arc<Shards>,
subscriber: Subscriber,
record_config: Arc<RecordConfig>,
shard_id: ShardId,
shard_name: ArcStr,
) -> Result<()> {
let mut archive = shards
.writers
.lock()
.remove(&shard_id)
.ok_or_else(|| anyhow!("no writer for shard {shard_name}"))?;
let (tx_batch, rx_batch) = mpsc::channel(record_config.slack);
let mut rx_batch = Batched::new(rx_batch, 10000);
let (tx_list, rx_list) = mpsc::unbounded();
let mut by_subid: IntMap<SubId, Id> = IntMap::default();
let mut image: IntMap<SubId, Event> = IntMap::default();
let mut subscribed: AHashMap<Path, Dval> = AHashMap::default();
let bcast = shards.bcast[&shard_id].clone();
let block_size = archive.block_size()?;
let flush_frequency = record_config.flush_frequency.map(|f| block_size * f);
let mut poll = record_config.poll_interval.map(time::interval);
let mut flush =
record_config.flush_interval.map(|d| time::interval_at(Instant::now() + d, d));
let mut rotate = match record_config.rotate_interval {
RotateDirective::Never => None,
RotateDirective::Interval(d) => Some(time::interval_at(Instant::now() + d, d)),
RotateDirective::Size(_) => {
let d = Duration::from_secs(1);
Some(time::interval_at(Instant::now() + d, d))
}
};
let mut to_add: Vec<(Path, SubId)> = Vec::new();
let mut all_paths: AHashSet<Path> = AHashSet::default();
let mut remove_paths: Vec<Path> = vec![];
let mut last_image = archive.len()?;
let mut last_flush = archive.len()?;
let mut pending_list: Option<Fuse<oneshot::Receiver<Lst>>> = None;
let mut batches = 0;
let mut last_batches = Instant::now();
let mut queued = Vec::new();
if let Some(interval) = record_config.poll_interval {
start_list_task(
interval,
rx_list,
subscriber.resolver(),
record_config.spec.clone(),
);
}
loop {
select_biased! {
_ = maybe_interval(&mut poll).fuse() => {
if pending_list.is_none() {
let (tx, rx) = oneshot::channel();
let _ = tx_list.unbounded_send(tx);
pending_list = Some(rx.fuse());
}
},
_ = maybe_interval(&mut flush).fuse() => {
if archive.len()? > last_flush {
task::block_in_place(|| -> Result<()> {
archive.flush_current().context("flushing archive")?;
Ok(last_flush = archive.len()?)
})?;
}
let now = Instant::now();
let elapsed = now - last_batches;
info!("recorded: {} batches/s", batches as f32 / elapsed.as_secs_f32());
last_batches = now;
batches = 0;
},
_ = maybe_interval(&mut rotate).fuse() => {
let rotate = match record_config.rotate_interval {
RotateDirective::Never => false,
RotateDirective::Size(sz) => archive.len()? >= sz,
RotateDirective::Interval(_) => true,
};
if rotate {
let now = Utc::now();
task::block_in_place(|| -> Result<()> {
archive.rotate(now).context("rotating log file")?;
last_image = 0;
last_flush = 0;
write_image(&mut archive, &by_subid, &image, now)
.context("writing image")?;
let reader = archive.current_reader()
.context("getting reader")?;
shards.notify_rotated(shard_id, now, reader)
})?;
}
},
r = wait_list(&mut pending_list).fuse() => {
pending_list = None;
if let Some(mut batches) = r {
for mut batch in batches.drain(..) {
for path in batch.drain(..) {
all_paths.insert(path.clone());
if !subscribed.contains_key(&path) {
let dv = subscriber.subscribe(path.clone());
let id = dv.id();
dv.updates(
UpdatesFlags::BEGIN_WITH_LAST
| UpdatesFlags::STOP_COLLECTING_LAST,
tx_batch.clone()
);
subscribed.insert(path.clone(), dv);
to_add.push((path, id));
}
}
}
for path in subscribed.keys() {
if !all_paths.contains(path) {
remove_paths.push(path.clone());
}
}
all_paths.clear();
for path in remove_paths.drain(..) {
if let Some(dv) = subscribed.remove(&path) {
image.remove(&dv.id());
by_subid.remove(&dv.id());
}
}
write_pathmap(&mut archive, &mut to_add, &mut by_subid)
.context("writing pathmap")?
}
},
batch = rx_batch.next() => match batch {
None => break,
Some(utils::BatchItem::InBatch(batch)) => queued.push(batch),
Some(utils::BatchItem::EndBatch) => {
batches += 1;
let now = Utc::now();
task::block_in_place(|| -> Result<()> {
for mut batch in queued.drain(..) {
let mut tbatch = BATCH_POOL.take();
for (subid, ev) in batch.drain(..) {
if record_config.image_frequency.is_some() {
image.insert(subid, ev.clone());
}
if let Some(id) = by_subid.get(&subid) {
tbatch.push(BatchItem(*id, ev));
}
}
archive.add_batch(false, now, &tbatch)
.context("adding archive batch")?;
let _ = bcast.send(BCastMsg::Batch(now, Arc::new(tbatch)));
}
match record_config.image_frequency {
None => (),
Some(freq) if archive.len()? - last_image < freq => (),
Some(_) => {
write_image(&mut archive, &by_subid, &image, Utc::now())
.context("writing image")?;
last_image = archive.len()?;
}
}
match flush_frequency {
None => (),
Some(freq) if archive.len()? - last_flush < freq => (),
Some(_) => {
archive.flush_current().context("flushing archive")?;
last_flush = archive.len()?;
}
}
Ok(())
})?;
}
}
}
}
Ok(())
}