worterbuch_cli/
lib.rs

1/*
2 *  Worterbuch cli clients common module
3 *
4 *  Copyright (C) 2024 Michael Bachmann
5 *
6 *  This program is free software: you can redistribute it and/or modify
7 *  it under the terms of the GNU Affero General Public License as published by
8 *  the Free Software Foundation, either version 3 of the License, or
9 *  (at your option) any later version.
10 *
11 *  This program is distributed in the hope that it will be useful,
12 *  but WITHOUT ANY WARRANTY; without even the implied warranty of
13 *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14 *  GNU Affero General Public License for more details.
15 *
16 *  You should have received a copy of the GNU Affero General Public License
17 *  along with this program.  If not, see <https://www.gnu.org/licenses/>.
18 */
19
20use miette::Result;
21use serde::{Deserialize, Serialize};
22use serde_json::{Value, json};
23use std::{
24    io::{BufRead, BufReader},
25    ops::ControlFlow,
26    thread,
27    time::Duration,
28};
29use tokio::{select, spawn, sync::mpsc, time::sleep};
30use tokio_graceful_shutdown::{SubsystemBuilder, SubsystemHandle};
31use tracing::error;
32use worterbuch_client::{
33    Err, Key, KeyValuePair, KeyValuePairs, LsState, PState, PStateEvent, ServerMessage as SM,
34    State, StateEvent,
35};
36
37pub async fn next_item<T>(rx: &mut mpsc::Receiver<T>, done: bool) -> Option<T> {
38    if done {
39        sleep(Duration::from_secs(10)).await;
40        None
41    } else {
42        rx.recv().await
43    }
44}
45
46pub fn provide_keys(keys: Option<Vec<String>>, subsys: SubsystemHandle, tx: mpsc::Sender<String>) {
47    if let Some(keys) = keys {
48        spawn(async move {
49            for key in keys {
50                if tx.send(key).await.is_err() {
51                    break;
52                }
53            }
54            drop(tx);
55        });
56    } else {
57        subsys.start(SubsystemBuilder::new("read-stdin", |s| async move {
58            let (lines_tx, mut lines_rx) = mpsc::channel(1);
59            thread::spawn(move || {
60                let mut lines = BufReader::new(std::io::stdin()).lines();
61                while let Some(Ok(line)) = lines.next() {
62                    if let Err(e) = lines_tx.blocking_send(line) {
63                        error!("Could not forward line from stdin: {e}");
64                    }
65                }
66            });
67            loop {
68                select! {
69                    _ = s.on_shutdown_requested() => break,
70                    recv = lines_rx.recv() => if let Some(key) = recv {
71                        if tx.send(key).await.is_err() {
72                            break;
73                        }
74                    } else {
75                        break;
76                    }
77                }
78            }
79            Ok(()) as Result<()>
80        }));
81    }
82}
83
84pub fn provide_values(json: bool, subsys: SubsystemHandle, tx: mpsc::Sender<Value>) {
85    subsys.start(SubsystemBuilder::new("read-stdin", move |s| async move {
86        let (lines_tx, mut lines_rx) = mpsc::channel(1);
87        thread::spawn(move || {
88            let mut lines = BufReader::new(std::io::stdin()).lines();
89            while let Some(Ok(line)) = lines.next() {
90                if let Err(e) = lines_tx.blocking_send(line) {
91                    error!("Could not forward line from stdin: {e}");
92                }
93            }
94        });
95        loop {
96            select! {
97                _ = s.on_shutdown_requested() => break,
98                recv = lines_rx.recv() => if let Some(line) = recv {
99                    if json {
100                        match serde_json::from_str::<Value>(&line) {
101                            Ok(value) => {
102                                if tx.send(value).await.is_err() {
103                                    break;
104                                }
105                            }
106                            Err(e) => {
107                                eprintln!("Error parsing json: {e}");
108                            }
109                        }
110                    } else if tx.send(json!(line)).await.is_err() {
111                        break;
112                    }
113                } else {
114                    break;
115                }
116            }
117        }
118        Ok(()) as Result<()>
119    }));
120}
121
122pub fn provide_key_value_pairs(
123    key_value_pairs: Option<Vec<String>>,
124    json: bool,
125    subsys: SubsystemHandle,
126    tx: mpsc::Sender<(Key, Value)>,
127) {
128    if let Some(key_value_pairs) = key_value_pairs {
129        spawn(async move {
130            for kvp in key_value_pairs {
131                if let ControlFlow::Break(_) = provide_key_value_pair(json, kvp, &tx).await {
132                    break;
133                }
134            }
135        });
136    } else {
137        let (lines_tx, mut lines_rx) = mpsc::channel(1);
138        thread::spawn(move || {
139            let mut lines = BufReader::new(std::io::stdin()).lines();
140            while let Some(Ok(line)) = lines.next() {
141                if let Err(e) = lines_tx.blocking_send(line) {
142                    error!("Could not forward line from stdin: {e}");
143                }
144            }
145        });
146        subsys.start(SubsystemBuilder::new("read-stdin", move|s| async move {
147            loop {
148                select! {
149                    _ = s.on_shutdown_requested() => break,
150                    recv = lines_rx.recv() => if let Some(line) = recv {
151                        if let ControlFlow::Break(_) = provide_key_value_pair(json, line, &tx).await {
152                            break;
153                        }
154                    } else {
155                        break;
156                    }
157                }
158            }
159            Ok(()) as Result<()>
160        }));
161    }
162}
163
164#[derive(Debug, Deserialize)]
165enum Line {
166    #[serde(untagged)]
167    Kvp(KeyValuePair),
168    #[serde(untagged)]
169    Kvps(KeyValuePairs),
170}
171
172async fn provide_key_value_pair(
173    json: bool,
174    line: String,
175    tx: &mpsc::Sender<(String, Value)>,
176) -> ControlFlow<()> {
177    if json {
178        match serde_json::from_str::<Line>(&line) {
179            Ok(Line::Kvp(KeyValuePair { key, value })) => {
180                if tx.send((key, value)).await.is_err() {
181                    return ControlFlow::Break(());
182                }
183            }
184            Ok(Line::Kvps(kvps)) => {
185                for KeyValuePair { key, value } in kvps {
186                    if tx.send((key, value)).await.is_err() {
187                        return ControlFlow::Break(());
188                    }
189                }
190            }
191            Err(e) => {
192                eprintln!("Error parsing json: {e}");
193            }
194        }
195    } else if let Some(index) = line.find('=') {
196        let key = line[..index].to_owned();
197        let value = line[index + 1..].to_owned();
198        if tx.send((key, json!(value))).await.is_err() {
199            return ControlFlow::Break(());
200        }
201    } else {
202        eprintln!("no key/value pair (e.g. 'a=b'): {}", line);
203    }
204    ControlFlow::Continue(())
205}
206
207pub fn print_message(msg: &SM, json: bool, raw: bool) {
208    match msg {
209        SM::PState(msg) => print_pstate(msg, json, raw),
210        SM::State(msg) => print_state(msg, json, raw),
211        SM::Err(msg) => print_err(msg, json),
212        SM::LsState(msg) => print_ls(msg, json),
213        _ => (),
214    }
215}
216
217pub fn print_change_event(msg: &SM, json: bool) {
218    match msg {
219        SM::PState(msg) => print_pstate_change(msg, json),
220        SM::State(msg) => print_state_change(msg, json),
221        SM::Err(msg) => print_err(msg, json),
222        _ => (),
223    }
224}
225
226pub fn print_del_event(msg: &SM, json: bool) {
227    match msg {
228        SM::PState(msg) => print_pstate_del(msg, json),
229        SM::State(msg) => print_state_del(msg, json),
230        SM::Err(msg) => print_err(msg, json),
231        _ => (),
232    }
233}
234
235fn print_pstate(msg: &PState, json: bool, raw: bool) {
236    match (json, raw) {
237        (true, true) => print_msg_as_json(&msg.event),
238        (true, false) => print_msg_as_json(msg),
239        (false, true) => match &msg.event {
240            PStateEvent::KeyValuePairs(kvps) => {
241                for kvp in kvps {
242                    println!("{kvp}");
243                }
244            }
245            PStateEvent::Deleted(kvps) => {
246                for kvp in kvps {
247                    println!("{}={}", kvp.key, Value::Null);
248                }
249            }
250        },
251        (false, false) => println!("{msg}"),
252    }
253}
254
255fn print_state(msg: &State, json: bool, raw: bool) {
256    match (json, raw) {
257        (true, true) => {
258            if let StateEvent::Value(val) = &msg.event {
259                print_msg_as_json(val);
260            } else {
261                print_msg_as_json(Value::Null);
262            }
263        }
264        (true, false) => print_msg_as_json(msg),
265        (false, true) => {
266            if let StateEvent::Value(val) = &msg.event {
267                println!("{}", val);
268            } else {
269                println!("{}", Value::Null);
270            }
271        }
272        (false, false) => println!("{msg}"),
273    }
274}
275
276fn print_ls(msg: &LsState, json: bool) {
277    if json {
278        print_msg_as_json(msg);
279    } else {
280        println!("{msg}");
281    }
282}
283
284fn print_err(msg: &Err, json: bool) {
285    if json {
286        print_msg_as_json(msg);
287    } else {
288        eprintln!("{msg}");
289    }
290}
291
292fn print_msg_as_json(msg: impl Serialize) {
293    match serde_json::to_string(&msg) {
294        Ok(json) => println!("{json}"),
295        Err(e) => {
296            eprintln!("Error converting message to json: {e}");
297        }
298    }
299}
300
301fn print_state_change(msg: &State, json: bool) {
302    if json {
303        if let StateEvent::Value(val) = &msg.event {
304            print_msg_as_json(val);
305        }
306    } else if let StateEvent::Value(val) = &msg.event {
307        println!("{}", val);
308    }
309}
310
311fn print_state_del(msg: &State, json: bool) {
312    if json {
313        if let StateEvent::Deleted(val) = &msg.event {
314            print_msg_as_json(val);
315        }
316    } else if let StateEvent::Deleted(val) = &msg.event {
317        println!("{}", val);
318    }
319}
320
321fn print_pstate_change(msg: &PState, json: bool) {
322    if json {
323        if let PStateEvent::KeyValuePairs(kvps) = &msg.event {
324            print_msg_as_json(kvps);
325        }
326    } else if let PStateEvent::KeyValuePairs(kvps) = &msg.event {
327        for kvp in kvps {
328            println!("{kvp}");
329        }
330    }
331}
332
333fn print_pstate_del(msg: &PState, json: bool) {
334    if json {
335        if let PStateEvent::Deleted(kvps) = &msg.event {
336            print_msg_as_json(kvps);
337        }
338    } else if let PStateEvent::Deleted(kvps) = &msg.event {
339        for kvp in kvps {
340            println!("{kvp}");
341        }
342    }
343}