pub mod event;
pub mod store;
pub mod task;
pub mod terminal;
pub mod ui;
use std::fmt::Debug;
use anyhow::Result;
use serde::ser::{Serialize, SerializeStruct, Serializer};
#[cfg(unix)]
use tokio::signal::unix::signal;
use tokio::sync::broadcast;
use tokio::sync::mpsc::unbounded_channel;
use ratatui::Viewport;
use store::Update;
use terminal::StdinReader;
use ui::{Frontend, Show};
use crate::task::Process;
#[derive(Clone, Debug)]
pub struct Exit<T> {
pub value: Option<T>,
}
#[derive(Clone, Default, Debug, Eq, PartialEq)]
pub struct Selection<O>
where
O: Serialize,
{
pub operation: Option<O>,
pub args: Vec<String>,
}
impl<O> Selection<O>
where
O: Serialize,
{
pub fn with_operation(mut self, operation: O) -> Self {
self.operation = Some(operation);
self
}
pub fn with_args(mut self, arg: String) -> Self {
self.args.push(arg);
self
}
}
impl<O> Serialize for Selection<O>
where
O: Serialize,
{
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
let mut state = serializer.serialize_struct("", 3)?;
state.serialize_field("operation", &self.operation)?;
state.serialize_field("args", &self.args)?;
state.end()
}
}
pub trait Share: Clone + Debug + Send + Sync + 'static {}
impl<T: Clone + Debug + Send + Sync + 'static> Share for T {}
pub struct Channel<M> {
pub tx: broadcast::Sender<M>,
pub rx: broadcast::Receiver<M>,
}
impl<M: Clone> Default for Channel<M> {
fn default() -> Self {
let (tx, rx) = broadcast::channel(1000);
Self { tx, rx }
}
}
pub async fn im<S, T, M, R>(
state: S,
viewport: Viewport,
channel: Channel<M>,
processors: Vec<T>,
) -> Result<Option<R>>
where
S: Update<M, Return = R> + Show<M> + Share,
T: Process<M> + Share,
M: Share,
R: Share,
{
let (terminator, mut interrupt_rx) = create_termination();
let (state_tx, state_rx) = unbounded_channel();
let (event_tx, event_rx) = unbounded_channel();
let (work_tx, work_rx) = unbounded_channel();
let store = store::Store::<S, M, R>::new(state_tx.clone());
let worker = task::Worker::<T, M, R>::new(work_tx.clone());
let frontend = Frontend::default();
let stdin_reader = StdinReader::default();
let _ = tokio::try_join!(
worker.run(
processors,
channel.rx.resubscribe(),
interrupt_rx.resubscribe()
),
store.run(
state,
terminator,
channel.rx.resubscribe(),
work_rx,
interrupt_rx.resubscribe(),
),
frontend.run(
channel.tx,
state_rx,
event_rx,
interrupt_rx.resubscribe(),
viewport
),
stdin_reader.run(event_tx, interrupt_rx.resubscribe()),
)?;
if let Ok(reason) = interrupt_rx.recv().await {
match reason {
Interrupted::User { payload } => Ok(payload),
Interrupted::OsSignal => anyhow::bail!("exited because of an os sig int"),
}
} else {
anyhow::bail!("exited because of an unexpected error");
}
}
#[derive(Debug, Clone)]
pub enum Interrupted<P>
where
P: Share,
{
OsSignal,
User { payload: Option<P> },
}
#[derive(Debug, Clone)]
pub struct Terminator<P>
where
P: Share,
{
interrupt_tx: broadcast::Sender<Interrupted<P>>,
}
impl<P> Terminator<P>
where
P: Share,
{
pub fn new(interrupt_tx: broadcast::Sender<Interrupted<P>>) -> Self {
Self { interrupt_tx }
}
pub fn terminate(&mut self, interrupted: Interrupted<P>) -> anyhow::Result<()> {
self.interrupt_tx.send(interrupted)?;
Ok(())
}
}
#[cfg(unix)]
async fn terminate_by_unix_signal<P>(mut terminator: Terminator<P>)
where
P: Share,
{
let mut interrupt_signal = signal(tokio::signal::unix::SignalKind::interrupt())
.expect("failed to create interrupt signal stream");
interrupt_signal.recv().await;
terminator
.terminate(Interrupted::OsSignal)
.expect("failed to send interrupt signal");
}
pub fn create_termination<P>() -> (Terminator<P>, broadcast::Receiver<Interrupted<P>>)
where
P: Share,
{
let (tx, rx) = broadcast::channel(1);
let terminator = Terminator::new(tx);
#[cfg(unix)]
tokio::spawn(terminate_by_unix_signal(terminator.clone()));
(terminator, rx)
}