radicle_tui/
lib.rs

1pub mod event;
2pub mod store;
3pub mod task;
4pub mod terminal;
5pub mod ui;
6
7use std::fmt::Debug;
8
9use anyhow::Result;
10
11use serde::ser::{Serialize, SerializeStruct, Serializer};
12
13#[cfg(unix)]
14use tokio::signal::unix::signal;
15use tokio::sync::broadcast;
16use tokio::sync::mpsc::unbounded_channel;
17
18use ratatui::Viewport;
19
20use store::Update;
21use terminal::StdinReader;
22use ui::{Frontend, Show};
23
24use crate::task::Process;
25
26/// An optional return value.
27#[derive(Clone, Debug)]
28pub struct Exit<T> {
29    pub value: Option<T>,
30}
31
32/// The output that is returned by all selection interfaces.
33#[derive(Clone, Default, Debug, Eq, PartialEq)]
34pub struct Selection<O>
35where
36    O: Serialize,
37{
38    pub operation: Option<O>,
39    pub args: Vec<String>,
40}
41
42impl<O> Selection<O>
43where
44    O: Serialize,
45{
46    pub fn with_operation(mut self, operation: O) -> Self {
47        self.operation = Some(operation);
48        self
49    }
50
51    pub fn with_args(mut self, arg: String) -> Self {
52        self.args.push(arg);
53        self
54    }
55}
56
57impl<O> Serialize for Selection<O>
58where
59    O: Serialize,
60{
61    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
62    where
63        S: Serializer,
64    {
65        let mut state = serializer.serialize_struct("", 3)?;
66        state.serialize_field("operation", &self.operation)?;
67        state.serialize_field("args", &self.args)?;
68        state.end()
69    }
70}
71
72/// Implementors of `Share` can be used inside the multi-threaded
73/// application environment.
74pub trait Share: Clone + Debug + Send + Sync + 'static {}
75
76/// Blanket implementation for all types that implement the required
77/// traits.
78impl<T: Clone + Debug + Send + Sync + 'static> Share for T {}
79
80/// A multi-producer, multi-consumer message channel.
81pub struct Channel<M> {
82    pub tx: broadcast::Sender<M>,
83    pub rx: broadcast::Receiver<M>,
84}
85
86impl<M: Clone> Default for Channel<M> {
87    fn default() -> Self {
88        let (tx, rx) = broadcast::channel(1000);
89        Self { tx, rx }
90    }
91}
92
93/// Initialize a `Store` with the `State` given and a `Frontend` with the `App` given,
94/// and run their main loops concurrently. Connect them to the `Channel` and also to
95/// an interrupt broadcast channel also initialized in this function.
96/// Additionally, a list of processors can be passed. Processors will also receive all
97/// applications messages and can emit new ones. They will be executed by an internal worker.
98pub async fn im<S, T, M, R>(
99    state: S,
100    viewport: Viewport,
101    channel: Channel<M>,
102    processors: Vec<T>,
103) -> Result<Option<R>>
104where
105    S: Update<M, Return = R> + Show<M> + Share,
106    T: Process<M> + Share,
107    M: Share,
108    R: Share,
109{
110    let (terminator, mut interrupt_rx) = create_termination();
111    let (state_tx, state_rx) = unbounded_channel();
112    let (event_tx, event_rx) = unbounded_channel();
113    let (work_tx, work_rx) = unbounded_channel();
114
115    let store = store::Store::<S, M, R>::new(state_tx.clone());
116    let worker = task::Worker::<T, M, R>::new(work_tx.clone());
117    let frontend = Frontend::default();
118    let stdin_reader = StdinReader::default();
119
120    // TODO(erikli): Handle errors
121    let _ = tokio::try_join!(
122        worker.run(
123            processors,
124            channel.rx.resubscribe(),
125            interrupt_rx.resubscribe()
126        ),
127        store.run(
128            state,
129            terminator,
130            channel.rx.resubscribe(),
131            work_rx,
132            interrupt_rx.resubscribe(),
133        ),
134        frontend.run(
135            channel.tx,
136            state_rx,
137            event_rx,
138            interrupt_rx.resubscribe(),
139            viewport
140        ),
141        stdin_reader.run(event_tx, interrupt_rx.resubscribe()),
142    )?;
143
144    if let Ok(reason) = interrupt_rx.recv().await {
145        match reason {
146            Interrupted::User { payload } => Ok(payload),
147            Interrupted::OsSignal => anyhow::bail!("exited because of an os sig int"),
148        }
149    } else {
150        anyhow::bail!("exited because of an unexpected error");
151    }
152}
153
154/// An `Interrupt` message that is produced by either an OS signal (e.g. kill)
155/// or the user by requesting the application to close.
156#[derive(Debug, Clone)]
157pub enum Interrupted<P>
158where
159    P: Share,
160{
161    OsSignal,
162    User { payload: Option<P> },
163}
164
165/// The `Terminator` wraps a broadcast channel and can send an interrupt messages.
166#[derive(Debug, Clone)]
167pub struct Terminator<P>
168where
169    P: Share,
170{
171    interrupt_tx: broadcast::Sender<Interrupted<P>>,
172}
173
174impl<P> Terminator<P>
175where
176    P: Share,
177{
178    /// Create a `Terminator` that stores the sending end of a broadcast channel.
179    pub fn new(interrupt_tx: broadcast::Sender<Interrupted<P>>) -> Self {
180        Self { interrupt_tx }
181    }
182
183    /// Send interrupt message to the broadcast channel.
184    pub fn terminate(&mut self, interrupted: Interrupted<P>) -> anyhow::Result<()> {
185        self.interrupt_tx.send(interrupted)?;
186
187        Ok(())
188    }
189}
190
191/// Receive `SIGINT` and call terminator which sends the interrupt message to its broadcast channel.
192#[cfg(unix)]
193async fn terminate_by_unix_signal<P>(mut terminator: Terminator<P>)
194where
195    P: Share,
196{
197    let mut interrupt_signal = signal(tokio::signal::unix::SignalKind::interrupt())
198        .expect("failed to create interrupt signal stream");
199
200    interrupt_signal.recv().await;
201
202    terminator
203        .terminate(Interrupted::OsSignal)
204        .expect("failed to send interrupt signal");
205}
206
207/// Create a broadcast channel and spawn a task for retrieving the applications' kill signal.
208pub fn create_termination<P>() -> (Terminator<P>, broadcast::Receiver<Interrupted<P>>)
209where
210    P: Share,
211{
212    let (tx, rx) = broadcast::channel(1);
213    let terminator = Terminator::new(tx);
214
215    #[cfg(unix)]
216    tokio::spawn(terminate_by_unix_signal(terminator.clone()));
217
218    (terminator, rx)
219}