use std::{
sync::Arc,
task::{Context, Poll},
};
use futures::future::join_all;
use kithara_abr::{Abr, AbrHandle, AbrPeerId};
use kithara_events::{EventBus, RequestPriority};
use kithara_net::NetError;
use kithara_platform::{
CancelGroup, RwLock,
tokio::sync::{mpsc, oneshot},
};
use tokio_util::sync::CancellationToken;
use super::{cmd::FetchCmd, downloader::DownloaderInner, response::FetchResponse};
pub trait Peer: Abr {
fn poll_next(&self, _cx: &mut Context<'_>) -> Poll<Option<Vec<FetchCmd>>> {
Poll::Ready(None)
}
fn priority(&self) -> RequestPriority {
RequestPriority::Low
}
}
pub(super) enum ResponseTarget {
Channel(oneshot::Sender<Result<FetchResponse, NetError>>),
Streaming,
}
pub(super) struct SlotEntry {
pub(super) peer_cancel: CancellationToken,
pub(super) cmd: InternalCmd,
}
pub(super) struct InternalCmd {
pub(super) peer_id: AbrPeerId,
pub(super) cancel: CancelGroup,
pub(super) cmd: FetchCmd,
pub(super) enqueued_at: kithara_platform::time::Instant,
pub(super) bus: Option<EventBus>,
pub(super) peer: Option<thunderdome::Index>,
pub(super) request_id: kithara_events::RequestId,
pub(super) priority: RequestPriority,
pub(super) response: ResponseTarget,
}
struct PeerInner {
abr: AbrHandle,
_pool: Arc<DownloaderInner>,
bus: Arc<RwLock<Option<EventBus>>>,
cancel: CancellationToken,
cmd_tx: mpsc::Sender<InternalCmd>,
}
impl Drop for PeerInner {
fn drop(&mut self) {
self.cancel.cancel();
}
}
#[derive(Clone)]
pub struct PeerHandle {
inner: Arc<PeerInner>,
}
impl std::fmt::Debug for PeerHandle {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("PeerHandle").finish_non_exhaustive()
}
}
impl PeerHandle {
pub(super) fn new(
pool: Arc<DownloaderInner>,
cancel: CancellationToken,
cmd_tx: mpsc::Sender<InternalCmd>,
bus: Arc<RwLock<Option<EventBus>>>,
abr: AbrHandle,
) -> Self {
Self {
inner: Arc::new(PeerInner {
cancel,
cmd_tx,
bus,
abr,
_pool: pool,
}),
}
}
#[must_use]
pub fn abr(&self) -> &AbrHandle {
&self.inner.abr
}
pub async fn batch(&self, cmds: Vec<FetchCmd>) -> Vec<Result<FetchResponse, NetError>> {
let mut receivers: Vec<Option<oneshot::Receiver<Result<FetchResponse, NetError>>>> =
Vec::with_capacity(cmds.len());
let bus = self.bus();
let peer_id = self.inner.abr.peer_id();
for cmd in cmds {
let (internal, resp_rx) = self.make_imperative(cmd, bus.clone(), peer_id);
if self.inner.cmd_tx.send(internal).await.is_err() {
receivers.push(None);
continue;
}
receivers.push(Some(resp_rx));
}
join_all(receivers.into_iter().map(|rx| async move {
match rx {
Some(resp_rx) => resp_rx.await.unwrap_or(Err(NetError::Cancelled)),
None => Err(NetError::Cancelled),
}
}))
.await
}
#[must_use]
pub fn bus(&self) -> Option<EventBus> {
self.inner.bus.lock_sync_read().clone()
}
#[must_use]
pub fn cancel(&self) -> CancellationToken {
self.inner.cancel.clone()
}
pub async fn execute(&self, cmd: FetchCmd) -> Result<FetchResponse, NetError> {
let (internal, resp_rx) = self.make_imperative(cmd, self.bus(), self.inner.abr.peer_id());
self.inner
.cmd_tx
.send(internal)
.await
.map_err(|_| NetError::Cancelled)?;
resp_rx.await.map_err(|_| NetError::Cancelled)?
}
fn make_imperative(
&self,
cmd: FetchCmd,
bus: Option<EventBus>,
peer_id: AbrPeerId,
) -> (
InternalCmd,
oneshot::Receiver<Result<FetchResponse, NetError>>,
) {
let cancel = CancelGroup::new(vec![self.inner.cancel.child_token()]);
let (resp_tx, resp_rx) = oneshot::channel();
let request_id = self.inner._pool.next_request_id();
let enqueued_at = kithara_platform::time::Instant::now();
let internal = InternalCmd {
cmd,
cancel,
bus,
peer_id,
request_id,
enqueued_at,
priority: RequestPriority::High,
response: ResponseTarget::Channel(resp_tx),
peer: None,
};
(internal, resp_rx)
}
#[must_use]
pub fn peer_id(&self) -> AbrPeerId {
self.inner.abr.peer_id()
}
#[must_use]
pub fn with_bus(self, bus: EventBus) -> Self {
*self.inner.bus.lock_sync_write() = Some(bus.clone());
let _ = self.inner.abr.clone().with_bus(bus);
self
}
}