worterbuch_cli/
lib.rs

1/*
2 *  Worterbuch cli clients common module
3 *
4 *  Copyright (C) 2024 Michael Bachmann
5 *
6 *  This program is free software: you can redistribute it and/or modify
7 *  it under the terms of the GNU Affero General Public License as published by
8 *  the Free Software Foundation, either version 3 of the License, or
9 *  (at your option) any later version.
10 *
11 *  This program is distributed in the hope that it will be useful,
12 *  but WITHOUT ANY WARRANTY; without even the implied warranty of
13 *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14 *  GNU Affero General Public License for more details.
15 *
16 *  You should have received a copy of the GNU Affero General Public License
17 *  along with this program.  If not, see <https://www.gnu.org/licenses/>.
18 */
19
20use serde::{Deserialize, Serialize};
21use serde_json::{json, Value};
22use std::{
23    io::{BufRead, BufReader},
24    ops::ControlFlow,
25    thread,
26    time::Duration,
27};
28use tokio::{select, spawn, sync::mpsc, time::sleep};
29use tokio_graceful_shutdown::{SubsystemBuilder, SubsystemHandle};
30use worterbuch_client::{
31    Err, Key, KeyValuePair, KeyValuePairs, LsState, PState, PStateEvent, ServerMessage as SM,
32    State, StateEvent,
33};
34
35pub async fn next_item<T>(rx: &mut mpsc::Receiver<T>, done: bool) -> Option<T> {
36    if done {
37        sleep(Duration::from_secs(10)).await;
38        None
39    } else {
40        rx.recv().await
41    }
42}
43
44pub fn provide_keys(keys: Option<Vec<String>>, subsys: SubsystemHandle, tx: mpsc::Sender<String>) {
45    if let Some(keys) = keys {
46        spawn(async move {
47            for key in keys {
48                if tx.send(key).await.is_err() {
49                    break;
50                }
51            }
52            drop(tx);
53        });
54    } else {
55        subsys.start(SubsystemBuilder::new("read-stdin", |s| async move {
56            let (lines_tx, mut lines_rx) = mpsc::channel(1);
57            thread::spawn(move || {
58                let mut lines = BufReader::new(std::io::stdin()).lines();
59                while let Some(Ok(line)) = lines.next() {
60                    if let Err(e) = lines_tx.blocking_send(line) {
61                        log::error!("Could not forward line from stdin: {e}");
62                    }
63                }
64            });
65            loop {
66                select! {
67                    _ = s.on_shutdown_requested() => break,
68                    recv = lines_rx.recv() => if let Some(key) = recv {
69                        if tx.send(key).await.is_err() {
70                            break;
71                        }
72                    } else {
73                        break;
74                    }
75                }
76            }
77            Ok(()) as anyhow::Result<()>
78        }));
79    }
80}
81
82pub fn provide_values(json: bool, subsys: SubsystemHandle, tx: mpsc::Sender<Value>) {
83    subsys.start(SubsystemBuilder::new("read-stdin", move |s| async move {
84        let (lines_tx, mut lines_rx) = mpsc::channel(1);
85        thread::spawn(move || {
86            let mut lines = BufReader::new(std::io::stdin()).lines();
87            while let Some(Ok(line)) = lines.next() {
88                if let Err(e) = lines_tx.blocking_send(line) {
89                    log::error!("Could not forward line from stdin: {e}");
90                }
91            }
92        });
93        loop {
94            select! {
95                _ = s.on_shutdown_requested() => break,
96                recv = lines_rx.recv() => if let Some(line) = recv {
97                    if json {
98                        match serde_json::from_str::<Value>(&line) {
99                            Ok(value) => {
100                                if tx.send(value).await.is_err() {
101                                    break;
102                                }
103                            }
104                            Err(e) => {
105                                eprintln!("Error parsing json: {e}");
106                            }
107                        }
108                    } else if tx.send(json!(line)).await.is_err() {
109                        break;
110                    }
111                } else {
112                    break;
113                }
114            }
115        }
116        Ok(()) as anyhow::Result<()>
117    }));
118}
119
120pub fn provide_key_value_pairs(
121    key_value_pairs: Option<Vec<String>>,
122    json: bool,
123    subsys: SubsystemHandle,
124    tx: mpsc::Sender<(Key, Value)>,
125) {
126    if let Some(key_value_pairs) = key_value_pairs {
127        spawn(async move {
128            for kvp in key_value_pairs {
129                if let ControlFlow::Break(_) = provide_key_value_pair(json, kvp, &tx).await {
130                    break;
131                }
132            }
133        });
134    } else {
135        let (lines_tx, mut lines_rx) = mpsc::channel(1);
136        thread::spawn(move || {
137            let mut lines = BufReader::new(std::io::stdin()).lines();
138            while let Some(Ok(line)) = lines.next() {
139                if let Err(e) = lines_tx.blocking_send(line) {
140                    log::error!("Could not forward line from stdin: {e}");
141                }
142            }
143        });
144        subsys.start(SubsystemBuilder::new("read-stdin", move|s| async move {
145            loop {
146                select! {
147                    _ = s.on_shutdown_requested() => break,
148                    recv = lines_rx.recv() => if let Some(line) = recv {
149                        if let ControlFlow::Break(_) = provide_key_value_pair(json, line, &tx).await {
150                            break;
151                        }
152                    } else {
153                        break;
154                    }
155                }
156            }
157            Ok(()) as anyhow::Result<()>
158        }));
159    }
160}
161
162#[derive(Debug, Deserialize)]
163enum Line {
164    #[serde(untagged)]
165    Kvp(KeyValuePair),
166    #[serde(untagged)]
167    Kvps(KeyValuePairs),
168}
169
170async fn provide_key_value_pair(
171    json: bool,
172    line: String,
173    tx: &mpsc::Sender<(String, Value)>,
174) -> ControlFlow<()> {
175    if json {
176        match serde_json::from_str::<Line>(&line) {
177            Ok(Line::Kvp(KeyValuePair { key, value })) => {
178                if tx.send((key, value)).await.is_err() {
179                    return ControlFlow::Break(());
180                }
181            }
182            Ok(Line::Kvps(kvps)) => {
183                for KeyValuePair { key, value } in kvps {
184                    if tx.send((key, value)).await.is_err() {
185                        return ControlFlow::Break(());
186                    }
187                }
188            }
189            Err(e) => {
190                eprintln!("Error parsing json: {e}");
191            }
192        }
193    } else if let Some(index) = line.find('=') {
194        let key = line[..index].to_owned();
195        let value = line[index + 1..].to_owned();
196        if tx.send((key, json!(value))).await.is_err() {
197            return ControlFlow::Break(());
198        }
199    } else {
200        eprintln!("no key/value pair (e.g. 'a=b'): {}", line);
201    }
202    ControlFlow::Continue(())
203}
204
205pub fn print_message(msg: &SM, json: bool, raw: bool) {
206    match msg {
207        SM::PState(msg) => print_pstate(msg, json, raw),
208        SM::State(msg) => print_state(msg, json, raw),
209        SM::Err(msg) => print_err(msg, json),
210        SM::LsState(msg) => print_ls(msg, json),
211        _ => (),
212    }
213}
214
215pub fn print_change_event(msg: &SM, json: bool) {
216    match msg {
217        SM::PState(msg) => print_pstate_change(msg, json),
218        SM::State(msg) => print_state_change(msg, json),
219        SM::Err(msg) => print_err(msg, json),
220        _ => (),
221    }
222}
223
224pub fn print_del_event(msg: &SM, json: bool) {
225    match msg {
226        SM::PState(msg) => print_pstate_del(msg, json),
227        SM::State(msg) => print_state_del(msg, json),
228        SM::Err(msg) => print_err(msg, json),
229        _ => (),
230    }
231}
232
233fn print_pstate(msg: &PState, json: bool, raw: bool) {
234    match (json, raw) {
235        (true, true) => print_msg_as_json(&msg.event),
236        (true, false) => print_msg_as_json(msg),
237        (false, true) => match &msg.event {
238            PStateEvent::KeyValuePairs(kvps) => {
239                for kvp in kvps {
240                    println!("{kvp}");
241                }
242            }
243            PStateEvent::Deleted(kvps) => {
244                for kvp in kvps {
245                    println!("{}={}", kvp.key, Value::Null);
246                }
247            }
248        },
249        (false, false) => println!("{msg}"),
250    }
251}
252
253fn print_state(msg: &State, json: bool, raw: bool) {
254    match (json, raw) {
255        (true, true) => {
256            if let StateEvent::Value(val) = &msg.event {
257                print_msg_as_json(val);
258            } else {
259                print_msg_as_json(Value::Null);
260            }
261        }
262        (true, false) => print_msg_as_json(msg),
263        (false, true) => {
264            if let StateEvent::Value(val) = &msg.event {
265                println!("{}", val);
266            } else {
267                println!("{}", Value::Null);
268            }
269        }
270        (false, false) => println!("{msg}"),
271    }
272}
273
274fn print_ls(msg: &LsState, json: bool) {
275    if json {
276        print_msg_as_json(msg);
277    } else {
278        println!("{msg}");
279    }
280}
281
282fn print_err(msg: &Err, json: bool) {
283    if json {
284        print_msg_as_json(msg);
285    } else {
286        eprintln!("{msg}");
287    }
288}
289
290fn print_msg_as_json(msg: impl Serialize) {
291    match serde_json::to_string(&msg) {
292        Ok(json) => println!("{json}"),
293        Err(e) => {
294            eprintln!("Error converting message to json: {e}");
295        }
296    }
297}
298
299fn print_state_change(msg: &State, json: bool) {
300    if json {
301        if let StateEvent::Value(val) = &msg.event {
302            print_msg_as_json(val);
303        }
304    } else if let StateEvent::Value(val) = &msg.event {
305        println!("{}", val);
306    }
307}
308
309fn print_state_del(msg: &State, json: bool) {
310    if json {
311        if let StateEvent::Deleted(val) = &msg.event {
312            print_msg_as_json(val);
313        }
314    } else if let StateEvent::Deleted(val) = &msg.event {
315        println!("{}", val);
316    }
317}
318
319fn print_pstate_change(msg: &PState, json: bool) {
320    if json {
321        if let PStateEvent::KeyValuePairs(kvps) = &msg.event {
322            print_msg_as_json(kvps);
323        }
324    } else if let PStateEvent::KeyValuePairs(kvps) = &msg.event {
325        for kvp in kvps {
326            println!("{kvp}");
327        }
328    }
329}
330
331fn print_pstate_del(msg: &PState, json: bool) {
332    if json {
333        if let PStateEvent::Deleted(kvps) = &msg.event {
334            print_msg_as_json(kvps);
335        }
336    } else if let PStateEvent::Deleted(kvps) = &msg.event {
337        for kvp in kvps {
338            println!("{kvp}");
339        }
340    }
341}