worterbuch_cli/
lib.rs

1/*
2 *  Worterbuch cli clients common module
3 *
4 *  Copyright (C) 2024 Michael Bachmann
5 *
6 *  This program is free software: you can redistribute it and/or modify
7 *  it under the terms of the GNU Affero General Public License as published by
8 *  the Free Software Foundation, either version 3 of the License, or
9 *  (at your option) any later version.
10 *
11 *  This program is distributed in the hope that it will be useful,
12 *  but WITHOUT ANY WARRANTY; without even the implied warranty of
13 *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14 *  GNU Affero General Public License for more details.
15 *
16 *  You should have received a copy of the GNU Affero General Public License
17 *  along with this program.  If not, see <https://www.gnu.org/licenses/>.
18 */
19
20use serde::{Deserialize, Serialize};
21use serde_json::{json, Value};
22use std::{ops::ControlFlow, time::Duration};
23use tokio::{
24    io::{AsyncBufReadExt, BufReader},
25    select, spawn,
26    sync::mpsc,
27    time::sleep,
28};
29use tokio_graceful_shutdown::{SubsystemBuilder, SubsystemHandle};
30use worterbuch_client::{
31    Err, Key, KeyValuePair, KeyValuePairs, LsState, PState, PStateEvent, ServerMessage as SM,
32    State, StateEvent,
33};
34
35pub async fn next_item<T>(rx: &mut mpsc::Receiver<T>, done: bool) -> Option<T> {
36    if done {
37        sleep(Duration::from_secs(10)).await;
38        None
39    } else {
40        rx.recv().await
41    }
42}
43
44pub fn provide_keys(keys: Option<Vec<String>>, subsys: SubsystemHandle, tx: mpsc::Sender<String>) {
45    if let Some(keys) = keys {
46        spawn(async move {
47            for key in keys {
48                if tx.send(key).await.is_err() {
49                    break;
50                }
51            }
52            drop(tx);
53        });
54    } else {
55        subsys.start(SubsystemBuilder::new("read-stdin", |s| async move {
56            let mut lines = BufReader::new(tokio::io::stdin()).lines();
57            loop {
58                select! {
59                    _ = s.on_shutdown_requested() => break,
60                    recv = lines.next_line() => if let Ok(Some(key)) = recv {
61                        if tx.send(key).await.is_err() {
62                            break;
63                        }
64                    } else {
65                        break;
66                    }
67                }
68            }
69            Ok(()) as anyhow::Result<()>
70        }));
71    }
72}
73
74pub fn provide_values(json: bool, subsys: SubsystemHandle, tx: mpsc::Sender<Value>) {
75    subsys.start(SubsystemBuilder::new("read-stdin", move |s| async move {
76        let mut lines = BufReader::new(tokio::io::stdin()).lines();
77        loop {
78            select! {
79                _ = s.on_shutdown_requested() => break,
80                recv = lines.next_line() => if let Ok(Some(line)) = recv {
81                    if json {
82                        match serde_json::from_str::<Value>(&line) {
83                            Ok(value) => {
84                                if tx.send(value).await.is_err() {
85                                    break;
86                                }
87                            }
88                            Err(e) => {
89                                eprintln!("Error parsing json: {e}");
90                            }
91                        }
92                    } else if tx.send(json!(line)).await.is_err() {
93                        break;
94                    }
95                } else {
96                    break;
97                }
98            }
99        }
100        Ok(()) as anyhow::Result<()>
101    }));
102}
103
104pub fn provide_key_value_pairs(
105    key_value_pairs: Option<Vec<String>>,
106    json: bool,
107    subsys: SubsystemHandle,
108    tx: mpsc::Sender<(Key, Value)>,
109) {
110    if let Some(key_value_pairs) = key_value_pairs {
111        spawn(async move {
112            for kvp in key_value_pairs {
113                if let ControlFlow::Break(_) = provide_key_value_pair(json, kvp, &tx).await {
114                    break;
115                }
116            }
117        });
118    } else {
119        subsys.start(SubsystemBuilder::new("read-stdin", move|s| async move {
120            let mut lines = BufReader::new(tokio::io::stdin()).lines();
121            loop {
122                select! {
123                    _ = s.on_shutdown_requested() => break,
124                    recv = lines.next_line() => if let Ok(Some(line)) = recv {
125                        if let ControlFlow::Break(_) = provide_key_value_pair(json, line, &tx).await {
126                            break;
127                        }
128                    } else {
129                        break;
130                    }
131                }
132            }
133            Ok(()) as anyhow::Result<()>
134        }));
135    }
136}
137
138#[derive(Debug, Deserialize)]
139enum Line {
140    #[serde(untagged)]
141    Kvp(KeyValuePair),
142    #[serde(untagged)]
143    Kvps(KeyValuePairs),
144}
145
146async fn provide_key_value_pair(
147    json: bool,
148    line: String,
149    tx: &mpsc::Sender<(String, Value)>,
150) -> ControlFlow<()> {
151    if json {
152        match serde_json::from_str::<Line>(&line) {
153            Ok(Line::Kvp(KeyValuePair { key, value })) => {
154                if tx.send((key, value)).await.is_err() {
155                    return ControlFlow::Break(());
156                }
157            }
158            Ok(Line::Kvps(kvps)) => {
159                for KeyValuePair { key, value } in kvps {
160                    if tx.send((key, value)).await.is_err() {
161                        return ControlFlow::Break(());
162                    }
163                }
164            }
165            Err(e) => {
166                eprintln!("Error parsing json: {e}");
167            }
168        }
169    } else if let Some(index) = line.find('=') {
170        let key = line[..index].to_owned();
171        let value = line[index + 1..].to_owned();
172        if tx.send((key, json!(value))).await.is_err() {
173            return ControlFlow::Break(());
174        }
175    } else {
176        eprintln!("no key/value pair (e.g. 'a=b'): {}", line);
177    }
178    ControlFlow::Continue(())
179}
180
181pub fn print_message(msg: &SM, json: bool, raw: bool) {
182    match msg {
183        SM::PState(msg) => print_pstate(msg, json, raw),
184        SM::State(msg) => print_state(msg, json, raw),
185        SM::Err(msg) => print_err(msg, json),
186        SM::LsState(msg) => print_ls(msg, json),
187        _ => (),
188    }
189}
190
191pub fn print_change_event(msg: &SM, json: bool) {
192    match msg {
193        SM::PState(msg) => print_pstate_change(msg, json),
194        SM::State(msg) => print_state_change(msg, json),
195        SM::Err(msg) => print_err(msg, json),
196        _ => (),
197    }
198}
199
200pub fn print_del_event(msg: &SM, json: bool) {
201    match msg {
202        SM::PState(msg) => print_pstate_del(msg, json),
203        SM::State(msg) => print_state_del(msg, json),
204        SM::Err(msg) => print_err(msg, json),
205        _ => (),
206    }
207}
208
209fn print_pstate(msg: &PState, json: bool, raw: bool) {
210    match (json, raw) {
211        (true, true) => print_msg_as_json(&msg.event),
212        (true, false) => print_msg_as_json(msg),
213        (false, true) => match &msg.event {
214            PStateEvent::KeyValuePairs(kvps) => {
215                for kvp in kvps {
216                    println!("{kvp}");
217                }
218            }
219            PStateEvent::Deleted(kvps) => {
220                for kvp in kvps {
221                    println!("{}={}", kvp.key, Value::Null);
222                }
223            }
224        },
225        (false, false) => println!("{msg}"),
226    }
227}
228
229fn print_state(msg: &State, json: bool, raw: bool) {
230    match (json, raw) {
231        (true, true) => {
232            if let StateEvent::Value(val) = &msg.event {
233                print_msg_as_json(val);
234            } else {
235                print_msg_as_json(Value::Null);
236            }
237        }
238        (true, false) => print_msg_as_json(msg),
239        (false, true) => {
240            if let StateEvent::Value(val) = &msg.event {
241                println!("{}", val);
242            } else {
243                println!("{}", Value::Null);
244            }
245        }
246        (false, false) => println!("{msg}"),
247    }
248}
249
250fn print_ls(msg: &LsState, json: bool) {
251    if json {
252        print_msg_as_json(msg);
253    } else {
254        println!("{msg}");
255    }
256}
257
258fn print_err(msg: &Err, json: bool) {
259    if json {
260        print_msg_as_json(msg);
261    } else {
262        eprintln!("{msg}");
263    }
264}
265
266fn print_msg_as_json(msg: impl Serialize) {
267    match serde_json::to_string(&msg) {
268        Ok(json) => println!("{json}"),
269        Err(e) => {
270            eprintln!("Error converting message to json: {e}");
271        }
272    }
273}
274
275fn print_state_change(msg: &State, json: bool) {
276    if json {
277        if let StateEvent::Value(val) = &msg.event {
278            print_msg_as_json(val);
279        }
280    } else if let StateEvent::Value(val) = &msg.event {
281        println!("{}", val);
282    }
283}
284
285fn print_state_del(msg: &State, json: bool) {
286    if json {
287        if let StateEvent::Deleted(val) = &msg.event {
288            print_msg_as_json(val);
289        }
290    } else if let StateEvent::Deleted(val) = &msg.event {
291        println!("{}", val);
292    }
293}
294
295fn print_pstate_change(msg: &PState, json: bool) {
296    if json {
297        if let PStateEvent::KeyValuePairs(kvps) = &msg.event {
298            print_msg_as_json(kvps);
299        }
300    } else if let PStateEvent::KeyValuePairs(kvps) = &msg.event {
301        for kvp in kvps {
302            println!("{kvp}");
303        }
304    }
305}
306
307fn print_pstate_del(msg: &PState, json: bool) {
308    if json {
309        if let PStateEvent::Deleted(kvps) = &msg.event {
310            print_msg_as_json(kvps);
311        }
312    } else if let PStateEvent::Deleted(kvps) = &msg.event {
313        for kvp in kvps {
314            println!("{kvp}");
315        }
316    }
317}