usenetnews-dynexp2 0.1.2

USENET news server expiry time dynamic tuning (for INN2)
Documentation
// Copyright 2022 Ian Jackson
// SPDX-License-Identifier: GPL-3.0-or-later
// There is NO WARRANTY.

use crate::prelude::*;

pub fn determine_free_space(
  path: &str,
) -> AR<Space> {
  let stab = nix::sys::statvfs::statvfs(path).context("statvfs")?;
  let bytes = stab.block_size() as f64 * stab.blocks_free() as f64;
  let kb = bytes / 1024.0;
  Ok(Space(kb))
}

impl Parameters {
  pub fn check(&self) -> AR<()> {
    (||{
      if !(
        self.flood <= self.highwater &&
        self.highwater <= self.lowwater
      ) {
        return Err(anyhow!("need flood <= highwater <= lowwater"));
      }

      if !(
        DaysOrNever::from(self.mindays + MIN_INTERVAL) <= self.maxdays
      ) {
        return Err(anyhow!("need mindays < maxdays by at least {}",
                           MIN_INTERVAL));
      }

      AOk(())
    })().context("bad parameters")
  }

  fn situation(&self, space: Space) -> Situation {
    if      space <  self.flood     { Situation::Flood }
    else if space <  self.highwater { Situation::High  }
    else if space <= self.lowwater  { Situation::Ok    }
    else                            { Situation::Low   }
  }
}    

impl ActiveData {
  pub fn cfg_range(&self) -> DaysRange {
    DaysRange([
      self.params.mindays,
      self.maxdays,
    ])
  }

  fn get_precise(&self) -> Days { self.line.def }

  fn set_unclamped(&mut self, new: Days, mut cl: CatLogger) -> bool {
    let changed = Days((self.get_precise() - new).0.abs()) >= ROUNDING_ERROR;
    self.line.def = new;
    cl.log(self, changed);
    changed
  }

  pub fn get_rounded(&self) -> [Days; 2] {
    [ -0.5, 0.5 ].map(|d| self.get_precise() + ROUNDING_ERROR * d)
  }

  pub fn current_lambda(&self) -> [Lambda; 2] {
    self.get_rounded().map(|v| self.cfg_range().mk_lambda(v))
  }

  pub fn increase_lambda_limit(&self) -> Lambda {
    let new = self.get_rounded()[1] + self.params.increase;
    let new = self.cfg_range().clamp(new);
    let lambda = self.cfg_range().mk_lambda(new);
    lambda
  }

  fn increase_to_lambda(&mut self, target: Lambda, cl: CatLogger) -> bool {
    let new = self.cfg_range().mk_days(target);

    // Don't increase by more than .increase, or reduce,
    // or increase to more than the maximum
    let old = self.get_precise();
    let new = new.clamp(
      old,
      cmp::min(
        self.params.increase + old,
        self.maxdays,
      )
    );

    self.set_unclamped(new, cl)
  }

  fn rebalance_towards(&mut self, target: Lambda, cl: CatLogger) -> bool {
    let new = self.cfg_range().mk_days(target);

    // Don't reduce by more than .reduce, or increase,
    // or reduce to more than the minimum
    let old = self.get_precise();
    let new = new.clamp(
      cmp::max(
        old - self.params.reduce,
        self.params.mindays,
      ),
      old,
    );

    self.set_unclamped(new, cl)
  }
}

struct ContextCore<'c> {
  spec: &'c SpoolSpec,
  free_space: Space,
  log: &'c mut LogCollection,
}

struct SpoolContext<'c> {
  cats: Cats<'c>,
  c: ContextCore<'c>,
}

struct CatContext<'c, 'cc:'c> {
  situation: Situation,
  c: &'c mut ContextCore<'cc>,
}

type CatEntries<'r> = Vec<RefMut<'r, ActiveData>>;
type Cats<'r> = BTreeMap::<Situation, CatEntries<'r>>;

impl SpoolContext<'_> {
  fn with_cats<XA>(
    &mut self,
    situation: Situation,
    f: fn(&mut CatEntries, CatContext, XA) -> AR<()>,
    xa: XA
  ) -> AR<()> {
    if let Some(entries) = self.cats.get_mut(&situation) {
      let cat_context = CatContext {
        situation,
        c: &mut self.c,
      };
      f(entries, cat_context, xa)?;
    }
    Ok(())
  }
}

struct CatLogger<'cr, 'c:'cr, 'cc:'c> {
  cc: &'cr mut CatContext<'c, 'cc>,
  actions: [&'static str; 2],
}

impl CatLogger<'_,'_,'_> {
  fn log(&mut self, ent: &ActiveData, changed: bool) {
    let action = self.actions[changed as usize];
    let action = LogAction {
      action,
      situation: self.cc.situation,
      was: ent.was,
      now: ent.get_precise(),
      now_lambda: ent.cfg_range().mk_lambda(ent.get_precise()),
    };
    self.cc.c.log.log(
      &ent.line.pattern,
      self.cc.c.free_space,
      &ent.params,
      action,
    );
  }
}

impl<'c,'cc> CatContext<'c,'cc> {
  fn logger(&mut self, actions: [&'static str; 2]) -> CatLogger<'_,'c,'cc> {
    CatLogger {
      cc: self,
      actions,
    }
  }
}

fn reduce(
  entries: &mut CatEntries,
  mut cc: CatContext,
  get_reduction: &dyn Fn(&Parameters) -> Days,
) -> AR<()> {
  let mut count_by_changed = [0; 2];

  for ent in entries {
    let reduction = get_reduction(&ent.params);

    let new = ent.get_precise() - reduction;

    // Don't reduce to less than the minimum, or increase.
    let new = new.clamp(
      ent.params.mindays,
      new,
    );

    let changed = ent.set_unclamped(
      new,
      cc.logger(["clamped", "reduced"]),
    );

    count_by_changed[changed as usize] += 1;
  }

  if count_by_changed[0] !=0 && count_by_changed[1] == 0 {
    eprintln!(
      "dynexp: warning: {}: {:?} but (all) reduction limits reached!",
      cc.c.spec.spool, cc.situation,
    );
  }
  
  Ok(())
}

fn increase(
  entries: &mut CatEntries,
  mut cc: CatContext,
  _: (),
) -> AR<()> {
  let target_lambda = entries.iter()
    .map(|ent| ent.increase_lambda_limit())
    .min().expect("empty category");

  entries.iter_mut()
    .for_each(|ent| {
      ent.increase_to_lambda(
        target_lambda,
        cc.logger(["unchanged", "increased"])
      );
    });

  Ok(())
}

fn rebalance(
  entries: &mut CatEntries,
  mut cc: CatContext,
  _: (),
) -> AR<()> {
  // Find the smallest lambda, but rounding each one up
  let worst = entries.iter()
    .map(|ent| ent.current_lambda()[1])
    .min().expect("empty category");

  entries.iter_mut()
    .for_each(|ent| {
      ent.rebalance_towards(worst, cc.logger(["unchanged", "trimmed"]));
    });

  Ok(())
}

// Returns number of kilobytes
pub type MockSpoolLookup = dyn Fn(&str) -> AR<f64> + 'static;

impl SpoolSpec {
  pub fn update(
    &self,
    entries: &mut SpoolEntries,
    log: &mut LogCollection,
    free_space: Space,
  ) -> AR<()> {
    let mut cats = Cats::new();

    for entry in &entries.entries {
      let entry = entry.borrow_mut();
      let situation = entry.params.situation(free_space);
      cats.entry(situation).or_default().push(entry);
    }

    let mut sctx = SpoolContext {
      cats,
      c: ContextCore {
        spec: self,
        free_space,
        log,
      },
    };

    sctx.with_cats(
      Situation::Flood,
      reduce,
      &|p| p.flooddrain,
    )?;

    sctx.with_cats(
      Situation::High,
      reduce,
      &|p| p.reduce,
    )?;

    sctx.with_cats(
      Situation::Low,
      increase,
      (),
    )?;
    
    sctx.with_cats(
      Situation::Ok,
      rebalance,
      (),
    )?;

    Ok(())
  }
}

impl Loaded {
  pub fn update_general(
    &mut self,
    logs: &mut LogCollection,
    mock_spool_lookup: Option<&MockSpoolLookup>
  ) -> AR<()> {
    for (spool, entries) in &mut self.spools {
      let path = &spool.spool;

      let free_space = if let Some(mock) = mock_spool_lookup {
        mock(path).map(Space)
      } else {
        determine_free_space(path)
      }.with_context(|| path.to_string())?;

      spool.update(entries, logs, free_space)?;
    }
    Ok(())
  }
}