1pub mod event;
2pub mod store;
3pub mod task;
4pub mod terminal;
5pub mod ui;
6
7use std::fmt::Debug;
8
9use anyhow::Result;
10
11use serde::ser::{Serialize, SerializeStruct, Serializer};
12
13#[cfg(unix)]
14use tokio::signal::unix::signal;
15use tokio::sync::broadcast;
16use tokio::sync::mpsc::unbounded_channel;
17
18use ratatui::Viewport;
19
20use store::Update;
21use terminal::StdinReader;
22use ui::{Frontend, Show};
23
24use crate::task::Process;
25
26#[derive(Clone, Debug)]
28pub struct Exit<T> {
29 pub value: Option<T>,
30}
31
32#[derive(Clone, Default, Debug, Eq, PartialEq)]
34pub struct Selection<O>
35where
36 O: Serialize,
37{
38 pub operation: Option<O>,
39 pub args: Vec<String>,
40}
41
42impl<O> Selection<O>
43where
44 O: Serialize,
45{
46 pub fn with_operation(mut self, operation: O) -> Self {
47 self.operation = Some(operation);
48 self
49 }
50
51 pub fn with_args(mut self, arg: String) -> Self {
52 self.args.push(arg);
53 self
54 }
55}
56
57impl<O> Serialize for Selection<O>
58where
59 O: Serialize,
60{
61 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
62 where
63 S: Serializer,
64 {
65 let mut state = serializer.serialize_struct("", 3)?;
66 state.serialize_field("operation", &self.operation)?;
67 state.serialize_field("args", &self.args)?;
68 state.end()
69 }
70}
71
72pub trait Share: Clone + Debug + Send + Sync + 'static {}
75
76impl<T: Clone + Debug + Send + Sync + 'static> Share for T {}
79
80pub struct Channel<M> {
82 pub tx: broadcast::Sender<M>,
83 pub rx: broadcast::Receiver<M>,
84}
85
86impl<M: Clone> Default for Channel<M> {
87 fn default() -> Self {
88 let (tx, rx) = broadcast::channel(1000);
89 Self { tx, rx }
90 }
91}
92
93pub async fn im<S, T, M, R>(
99 state: S,
100 viewport: Viewport,
101 channel: Channel<M>,
102 processors: Vec<T>,
103) -> Result<Option<R>>
104where
105 S: Update<M, Return = R> + Show<M> + Share,
106 T: Process<M> + Share,
107 M: Share,
108 R: Share,
109{
110 let (terminator, mut interrupt_rx) = create_termination();
111 let (state_tx, state_rx) = unbounded_channel();
112 let (event_tx, event_rx) = unbounded_channel();
113 let (work_tx, work_rx) = unbounded_channel();
114
115 let store = store::Store::<S, M, R>::new(state_tx.clone());
116 let worker = task::Worker::<T, M, R>::new(work_tx.clone());
117 let frontend = Frontend::default();
118 let stdin_reader = StdinReader::default();
119
120 let _ = tokio::try_join!(
122 worker.run(
123 processors,
124 channel.rx.resubscribe(),
125 interrupt_rx.resubscribe()
126 ),
127 store.run(
128 state,
129 terminator,
130 channel.rx.resubscribe(),
131 work_rx,
132 interrupt_rx.resubscribe(),
133 ),
134 frontend.run(
135 channel.tx,
136 state_rx,
137 event_rx,
138 interrupt_rx.resubscribe(),
139 viewport
140 ),
141 stdin_reader.run(event_tx, interrupt_rx.resubscribe()),
142 )?;
143
144 if let Ok(reason) = interrupt_rx.recv().await {
145 match reason {
146 Interrupted::User { payload } => Ok(payload),
147 Interrupted::OsSignal => anyhow::bail!("exited because of an os sig int"),
148 }
149 } else {
150 anyhow::bail!("exited because of an unexpected error");
151 }
152}
153
154#[derive(Debug, Clone)]
157pub enum Interrupted<P>
158where
159 P: Share,
160{
161 OsSignal,
162 User { payload: Option<P> },
163}
164
165#[derive(Debug, Clone)]
167pub struct Terminator<P>
168where
169 P: Share,
170{
171 interrupt_tx: broadcast::Sender<Interrupted<P>>,
172}
173
174impl<P> Terminator<P>
175where
176 P: Share,
177{
178 pub fn new(interrupt_tx: broadcast::Sender<Interrupted<P>>) -> Self {
180 Self { interrupt_tx }
181 }
182
183 pub fn terminate(&mut self, interrupted: Interrupted<P>) -> anyhow::Result<()> {
185 self.interrupt_tx.send(interrupted)?;
186
187 Ok(())
188 }
189}
190
191#[cfg(unix)]
193async fn terminate_by_unix_signal<P>(mut terminator: Terminator<P>)
194where
195 P: Share,
196{
197 let mut interrupt_signal = signal(tokio::signal::unix::SignalKind::interrupt())
198 .expect("failed to create interrupt signal stream");
199
200 interrupt_signal.recv().await;
201
202 terminator
203 .terminate(Interrupted::OsSignal)
204 .expect("failed to send interrupt signal");
205}
206
207pub fn create_termination<P>() -> (Terminator<P>, broadcast::Receiver<Interrupted<P>>)
209where
210 P: Share,
211{
212 let (tx, rx) = broadcast::channel(1);
213 let terminator = Terminator::new(tx);
214
215 #[cfg(unix)]
216 tokio::spawn(terminate_by_unix_signal(terminator.clone()));
217
218 (terminator, rx)
219}