pub mod command;
pub mod event;
pub use command::Command;
pub use event::Event;
use radicle::storage::refs::FeatureLevel;
use std::collections::{BTreeMap, VecDeque};
use std::num::NonZeroUsize;
use std::time;
use radicle_core::{NodeId, RepoId};
use crate::fetcher::RefsToFetch;
use crate::service::FETCH_TIMEOUT;
pub const MAX_FETCH_QUEUE_SIZE: usize = 128;
pub const MAX_CONCURRENCY: NonZeroUsize = NonZeroUsize::MIN;
#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
pub struct FetchConfig {
timeout: time::Duration,
protocol: radicle_fetch::Config,
}
impl FetchConfig {
pub fn new() -> Self {
Self {
timeout: FETCH_TIMEOUT,
protocol: radicle_fetch::Config::default(),
}
}
pub fn with_timeout(mut self, timeout: time::Duration) -> Self {
self.timeout = timeout;
self
}
pub fn with_fetch_config(mut self, config: radicle_fetch::Config) -> Self {
self.protocol = config;
self
}
pub fn with_minimum_feature_level(mut self, feature_level: FeatureLevel) -> Self {
self.protocol.level_min = feature_level;
self
}
pub fn timeout(&self) -> time::Duration {
self.timeout
}
pub fn fetch_config(&self) -> radicle_fetch::Config {
self.protocol
}
fn merge(&mut self, other: FetchConfig) {
self.timeout = self.timeout.max(other.timeout);
self.protocol.limit.refs = self.protocol.limit.refs.max(other.protocol.limit.refs);
self.protocol.limit.special = self
.protocol
.limit
.special
.max(other.protocol.limit.special);
self.protocol.level_min = self.protocol.level_min.min(other.protocol.level_min);
}
}
impl Default for FetchConfig {
fn default() -> Self {
Self::new()
}
}
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct FetcherState {
active: BTreeMap<RepoId, ActiveFetch>,
queues: BTreeMap<NodeId, Queue>,
config: Config,
}
impl Default for FetcherState {
fn default() -> Self {
Self::new(Config::default())
}
}
impl FetcherState {
pub fn new(config: Config) -> Self {
Self {
active: BTreeMap::new(),
queues: BTreeMap::new(),
config,
}
}
}
impl FetcherState {
pub fn handle(&mut self, command: Command) -> Event {
match command {
Command::Fetch(fetch) => self.fetch(fetch).into(),
Command::Fetched(fetched) => self.fetched(fetched).into(),
Command::Cancel(cancel) => self.cancel(cancel).into(),
}
}
pub fn fetch(
&mut self,
command::Fetch {
from,
rid,
refs,
config,
}: command::Fetch,
) -> event::Fetch {
if let Some(active) = self.active.get(&rid) {
if active.refs == refs && active.from == from {
return event::Fetch::AlreadyFetching { rid, from };
} else {
return self.enqueue(rid, from, refs, config);
}
}
if self.is_at_node_capacity(&from) {
self.enqueue(rid, from, refs, config)
} else {
self.active.insert(
rid,
ActiveFetch {
from,
refs: refs.clone(),
},
);
event::Fetch::Started {
rid,
from,
refs,
config,
}
}
}
pub fn fetched(&mut self, command::Fetched { from, rid }: command::Fetched) -> event::Fetched {
match self.active.remove(&rid) {
None => event::Fetched::NotFound { from, rid },
Some(ActiveFetch { from, refs }) => event::Fetched::Completed { from, rid, refs },
}
}
pub fn dequeue(&mut self, from: &NodeId) -> Option<QueuedFetch> {
let is_at_capacity = self.is_at_node_capacity(from);
let queue = self.queues.get_mut(from)?;
let active = &self.active;
queue.try_dequeue(|QueuedFetch { rid, .. }| !is_at_capacity && !active.contains_key(rid))
}
pub fn cancel(&mut self, command::Cancel { from }: command::Cancel) -> event::Cancel {
let cancelled: Vec<_> = self
.active
.iter()
.filter_map(|(rid, f)| (f.from == from).then_some(*rid))
.collect();
let ongoing: BTreeMap<_, _> = cancelled
.iter()
.filter_map(|rid| self.active.remove(rid).map(|f| (*rid, f)))
.collect();
let ongoing = (!ongoing.is_empty()).then_some(ongoing);
let queued = self.queues.remove(&from).filter(|queue| !queue.is_empty());
match (ongoing, queued) {
(None, None) => event::Cancel::Unexpected { from },
(ongoing, queued) => event::Cancel::Canceled {
from,
active: ongoing.unwrap_or_default(),
queued: queued.map(|q| q.queue).unwrap_or_default(),
},
}
}
fn enqueue(
&mut self,
rid: RepoId,
from: NodeId,
refs: RefsToFetch,
config: FetchConfig,
) -> event::Fetch {
let queue = self
.queues
.entry(from)
.or_insert(Queue::new(self.config.maximum_queue_size));
match queue.enqueue(QueuedFetch { rid, refs, config }) {
Enqueue::CapacityReached(QueuedFetch { rid, refs, config }) => {
event::Fetch::QueueAtCapacity {
rid,
from,
refs,
config,
capacity: queue.len(),
}
}
Enqueue::Queued => event::Fetch::Queued { rid, from },
Enqueue::Merged => event::Fetch::Queued { rid, from },
}
}
}
impl FetcherState {
pub fn queued_fetches(&self) -> &BTreeMap<NodeId, Queue> {
&self.queues
}
pub fn active_fetches(&self) -> &BTreeMap<RepoId, ActiveFetch> {
&self.active
}
pub fn get_active_fetch(&self, rid: &RepoId) -> Option<&ActiveFetch> {
self.active.get(rid)
}
fn is_at_node_capacity(&self, node: &NodeId) -> bool {
let count = self.active.values().filter(|f| &f.from == node).count();
count >= self.config.maximum_concurrency.into()
}
}
#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
pub struct Config {
maximum_concurrency: NonZeroUsize,
maximum_queue_size: MaxQueueSize,
}
impl Config {
pub fn new() -> Self {
Self::default()
}
pub fn with_max_capacity(mut self, capacity: MaxQueueSize) -> Self {
self.maximum_queue_size = capacity;
self
}
pub fn with_max_concurrency(mut self, concurrency: NonZeroUsize) -> Self {
self.maximum_concurrency = concurrency;
self
}
}
impl Default for Config {
fn default() -> Self {
Self {
maximum_concurrency: MAX_CONCURRENCY,
maximum_queue_size: MaxQueueSize::default(),
}
}
}
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct ActiveFetch {
pub from: NodeId,
pub refs: RefsToFetch,
}
impl ActiveFetch {
pub fn from(&self) -> &NodeId {
&self.from
}
pub fn refs(&self) -> &RefsToFetch {
&self.refs
}
}
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct QueuedFetch {
pub rid: RepoId,
pub refs: RefsToFetch,
pub config: FetchConfig,
}
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct Queue {
queue: VecDeque<QueuedFetch>,
max_queue_size: MaxQueueSize,
}
#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub struct MaxQueueSize(usize);
impl MaxQueueSize {
pub const MIN: Self = MaxQueueSize(1);
pub fn new(size: NonZeroUsize) -> Self {
Self(size.into())
}
pub fn as_usize(&self) -> usize {
self.0
}
fn is_exceeded_by(&self, n: usize) -> bool {
n >= self.0
}
}
impl Default for MaxQueueSize {
fn default() -> Self {
Self(MAX_FETCH_QUEUE_SIZE)
}
}
#[must_use]
#[derive(Debug, PartialEq, Eq)]
pub(super) enum Enqueue {
CapacityReached(QueuedFetch),
Queued,
Merged,
}
impl Queue {
pub(super) fn new(max_queue_size: MaxQueueSize) -> Self {
Self {
queue: VecDeque::with_capacity(max_queue_size.0),
max_queue_size,
}
}
pub(super) fn len(&self) -> usize {
self.queue.len()
}
pub(super) fn is_empty(&self) -> bool {
self.queue.is_empty()
}
pub(super) fn enqueue(&mut self, fetch: QueuedFetch) -> Enqueue {
if let Some(existing) = self.queue.iter_mut().find(|qf| qf.rid == fetch.rid) {
existing.refs = existing.refs.clone().merge(fetch.refs);
existing.config.merge(fetch.config);
return Enqueue::Merged;
}
if self.max_queue_size.is_exceeded_by(self.queue.len()) {
Enqueue::CapacityReached(fetch)
} else {
self.queue.push_back(fetch);
Enqueue::Queued
}
}
pub(super) fn try_dequeue<P>(&mut self, predicate: P) -> Option<QueuedFetch>
where
P: FnOnce(&QueuedFetch) -> bool,
{
let fetch = self.dequeue()?;
if predicate(&fetch) {
Some(fetch)
} else {
self.queue.push_front(fetch);
None
}
}
pub(super) fn dequeue(&mut self) -> Option<QueuedFetch> {
self.queue.pop_front()
}
pub fn iter<'a>(&'a self) -> QueueIter<'a> {
QueueIter {
inner: self.queue.iter(),
}
}
}
pub struct QueueIter<'a> {
inner: std::collections::vec_deque::Iter<'a, QueuedFetch>,
}
impl<'a> Iterator for QueueIter<'a> {
type Item = &'a QueuedFetch;
fn next(&mut self) -> Option<Self::Item> {
self.inner.next()
}
}
impl<'a> IntoIterator for &'a Queue {
type Item = &'a QueuedFetch;
type IntoIter = QueueIter<'a>;
fn into_iter(self) -> Self::IntoIter {
self.iter()
}
}