worterbuch_cli/
lib.rs

1/*
2 *  Worterbuch cli clients common module
3 *
4 *  Copyright (C) 2024 Michael Bachmann
5 *
6 *  This program is free software: you can redistribute it and/or modify
7 *  it under the terms of the GNU Affero General Public License as published by
8 *  the Free Software Foundation, either version 3 of the License, or
9 *  (at your option) any later version.
10 *
11 *  This program is distributed in the hope that it will be useful,
12 *  but WITHOUT ANY WARRANTY; without even the implied warranty of
13 *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14 *  GNU Affero General Public License for more details.
15 *
16 *  You should have received a copy of the GNU Affero General Public License
17 *  along with this program.  If not, see <https://www.gnu.org/licenses/>.
18 */
19
20use miette::Result;
21use serde::{Deserialize, Serialize};
22use serde_json::{Value, json};
23use std::{
24    io::{BufRead, BufReader},
25    ops::ControlFlow,
26    thread,
27    time::Duration,
28};
29use tokio::{select, spawn, sync::mpsc, time::sleep};
30use tokio_graceful_shutdown::{SubsystemBuilder, SubsystemHandle};
31use tracing::error;
32use worterbuch_client::{
33    Err, Key, KeyValuePair, KeyValuePairs, LsState, PState, PStateEvent, ServerMessage as SM,
34    State, StateEvent,
35};
36
37pub async fn next_item<T>(rx: &mut mpsc::Receiver<T>, done: bool) -> Option<T> {
38    if done {
39        sleep(Duration::from_secs(10)).await;
40        None
41    } else {
42        rx.recv().await
43    }
44}
45
46pub fn provide_keys(
47    keys: Option<Vec<String>>,
48    subsys: &mut SubsystemHandle,
49    tx: mpsc::Sender<String>,
50) {
51    if let Some(keys) = keys {
52        spawn(async move {
53            for key in keys {
54                if tx.send(key).await.is_err() {
55                    break;
56                }
57            }
58            drop(tx);
59        });
60    } else {
61        subsys.start(SubsystemBuilder::new(
62            "read-stdin",
63            async move |s: &mut SubsystemHandle| {
64                let (lines_tx, mut lines_rx) = mpsc::channel(1);
65                thread::spawn(move || {
66                    let mut lines = BufReader::new(std::io::stdin()).lines();
67                    while let Some(Ok(line)) = lines.next() {
68                        if let Err(e) = lines_tx.blocking_send(line) {
69                            error!("Could not forward line from stdin: {e}");
70                        }
71                    }
72                });
73                loop {
74                    select! {
75                        _ = s.on_shutdown_requested() => break,
76                        recv = lines_rx.recv() => if let Some(key) = recv {
77                            if tx.send(key).await.is_err() {
78                                break;
79                            }
80                        } else {
81                            break;
82                        }
83                    }
84                }
85                Ok(()) as Result<()>
86            },
87        ));
88    }
89}
90
91pub fn provide_values(json: bool, subsys: &mut SubsystemHandle, tx: mpsc::Sender<Value>) {
92    subsys.start(SubsystemBuilder::new(
93        "read-stdin",
94        async move |s: &mut SubsystemHandle| {
95            let (lines_tx, mut lines_rx) = mpsc::channel(1);
96            thread::spawn(move || {
97                let mut lines = BufReader::new(std::io::stdin()).lines();
98                while let Some(Ok(line)) = lines.next() {
99                    if let Err(e) = lines_tx.blocking_send(line) {
100                        error!("Could not forward line from stdin: {e}");
101                    }
102                }
103            });
104            loop {
105                select! {
106                    _ = s.on_shutdown_requested() => break,
107                    recv = lines_rx.recv() => if let Some(line) = recv {
108                        if json {
109                            match serde_json::from_str::<Value>(&line) {
110                                Ok(value) => {
111                                    if tx.send(value).await.is_err() {
112                                        break;
113                                    }
114                                }
115                                Err(e) => {
116                                    eprintln!("Error parsing json: {e}");
117                                }
118                            }
119                        } else if tx.send(json!(line)).await.is_err() {
120                            break;
121                        }
122                    } else {
123                        break;
124                    }
125                }
126            }
127            Ok(()) as Result<()>
128        },
129    ));
130}
131
132pub fn provide_key_value_pairs(
133    key_value_pairs: Option<Vec<String>>,
134    json: bool,
135    subsys: &mut SubsystemHandle,
136    tx: mpsc::Sender<(Key, Value)>,
137) {
138    if let Some(key_value_pairs) = key_value_pairs {
139        spawn(async move {
140            for kvp in key_value_pairs {
141                if let ControlFlow::Break(_) = provide_key_value_pair(json, kvp, &tx).await {
142                    break;
143                }
144            }
145        });
146    } else {
147        let (lines_tx, mut lines_rx) = mpsc::channel(1);
148        thread::spawn(move || {
149            let mut lines = BufReader::new(std::io::stdin()).lines();
150            while let Some(Ok(line)) = lines.next() {
151                if let Err(e) = lines_tx.blocking_send(line) {
152                    error!("Could not forward line from stdin: {e}");
153                }
154            }
155        });
156        subsys.start(SubsystemBuilder::new("read-stdin", async move |s: &mut SubsystemHandle| {
157            loop {
158                select! {
159                    _ = s.on_shutdown_requested() => break,
160                    recv = lines_rx.recv() => if let Some(line) = recv {
161                        if let ControlFlow::Break(_) = provide_key_value_pair(json, line, &tx).await {
162                            break;
163                        }
164                    } else {
165                        break;
166                    }
167                }
168            }
169            Ok(()) as Result<()>
170        }));
171    }
172}
173
174#[derive(Debug, Deserialize)]
175enum Line {
176    #[serde(untagged)]
177    Kvp(KeyValuePair),
178    #[serde(untagged)]
179    Kvps(KeyValuePairs),
180}
181
182async fn provide_key_value_pair(
183    json: bool,
184    line: String,
185    tx: &mpsc::Sender<(String, Value)>,
186) -> ControlFlow<()> {
187    if json {
188        match serde_json::from_str::<Line>(&line) {
189            Ok(Line::Kvp(KeyValuePair { key, value })) => {
190                if tx.send((key, value)).await.is_err() {
191                    return ControlFlow::Break(());
192                }
193            }
194            Ok(Line::Kvps(kvps)) => {
195                for KeyValuePair { key, value } in kvps {
196                    if tx.send((key, value)).await.is_err() {
197                        return ControlFlow::Break(());
198                    }
199                }
200            }
201            Err(e) => {
202                eprintln!("Error parsing json: {e}");
203            }
204        }
205    } else if let Some(index) = line.find('=') {
206        let key = line[..index].to_owned();
207        let value = line[index + 1..].to_owned();
208        if tx.send((key, json!(value))).await.is_err() {
209            return ControlFlow::Break(());
210        }
211    } else {
212        eprintln!("no key/value pair (e.g. 'a=b'): {line}");
213    }
214    ControlFlow::Continue(())
215}
216
217pub fn print_message(msg: &SM, json: bool, raw: bool) {
218    match msg {
219        SM::PState(msg) => print_pstate(msg, json, raw),
220        SM::State(msg) => print_state(msg, json, raw),
221        SM::Err(msg) => print_err(msg, json),
222        SM::LsState(msg) => print_ls(msg, json),
223        _ => (),
224    }
225}
226
227pub fn print_change_event(msg: &SM, json: bool) {
228    match msg {
229        SM::PState(msg) => print_pstate_change(msg, json),
230        SM::State(msg) => print_state_change(msg, json),
231        SM::Err(msg) => print_err(msg, json),
232        _ => (),
233    }
234}
235
236pub fn print_del_event(msg: &SM, json: bool) {
237    match msg {
238        SM::PState(msg) => print_pstate_del(msg, json),
239        SM::State(msg) => print_state_del(msg, json),
240        SM::Err(msg) => print_err(msg, json),
241        _ => (),
242    }
243}
244
245fn print_pstate(msg: &PState, json: bool, raw: bool) {
246    match (json, raw) {
247        (true, true) => print_msg_as_json(&msg.event),
248        (true, false) => print_msg_as_json(msg),
249        (false, true) => match &msg.event {
250            PStateEvent::KeyValuePairs(kvps) => {
251                for kvp in kvps {
252                    println!("{kvp}");
253                }
254            }
255            PStateEvent::Deleted(kvps) => {
256                for kvp in kvps {
257                    println!("{}={}", kvp.key, Value::Null);
258                }
259            }
260        },
261        (false, false) => println!("{msg}"),
262    }
263}
264
265fn print_state(msg: &State, json: bool, raw: bool) {
266    match (json, raw) {
267        (true, true) => {
268            if let StateEvent::Value(val) = &msg.event {
269                print_msg_as_json(val);
270            } else {
271                print_msg_as_json(Value::Null);
272            }
273        }
274        (true, false) => print_msg_as_json(msg),
275        (false, true) => {
276            if let StateEvent::Value(val) = &msg.event {
277                println!("{val}");
278            } else {
279                println!("{}", Value::Null);
280            }
281        }
282        (false, false) => println!("{msg}"),
283    }
284}
285
286fn print_ls(msg: &LsState, json: bool) {
287    if json {
288        print_msg_as_json(msg);
289    } else {
290        println!("{msg}");
291    }
292}
293
294fn print_err(msg: &Err, json: bool) {
295    if json {
296        print_msg_as_json(msg);
297    } else {
298        eprintln!("{msg}");
299    }
300}
301
302fn print_msg_as_json(msg: impl Serialize) {
303    match serde_json::to_string(&msg) {
304        Ok(json) => println!("{json}"),
305        Err(e) => {
306            eprintln!("Error converting message to json: {e}");
307        }
308    }
309}
310
311fn print_state_change(msg: &State, json: bool) {
312    if json {
313        if let StateEvent::Value(val) = &msg.event {
314            print_msg_as_json(val);
315        }
316    } else if let StateEvent::Value(val) = &msg.event {
317        println!("{val}");
318    }
319}
320
321fn print_state_del(msg: &State, json: bool) {
322    if json {
323        if let StateEvent::Deleted(val) = &msg.event {
324            print_msg_as_json(val);
325        }
326    } else if let StateEvent::Deleted(val) = &msg.event {
327        println!("{val}");
328    }
329}
330
331fn print_pstate_change(msg: &PState, json: bool) {
332    if json {
333        if let PStateEvent::KeyValuePairs(kvps) = &msg.event {
334            print_msg_as_json(kvps);
335        }
336    } else if let PStateEvent::KeyValuePairs(kvps) = &msg.event {
337        for kvp in kvps {
338            println!("{kvp}");
339        }
340    }
341}
342
343fn print_pstate_del(msg: &PState, json: bool) {
344    if json {
345        if let PStateEvent::Deleted(kvps) = &msg.event {
346            print_msg_as_json(kvps);
347        }
348    } else if let PStateEvent::Deleted(kvps) = &msg.event {
349        for kvp in kvps {
350            println!("{kvp}");
351        }
352    }
353}