1use serde::{Deserialize, Serialize};
21use serde_json::{json, Value};
22use std::{ops::ControlFlow, time::Duration};
23use tokio::{
24 io::{AsyncBufReadExt, BufReader},
25 select, spawn,
26 sync::mpsc,
27 time::sleep,
28};
29use tokio_graceful_shutdown::{SubsystemBuilder, SubsystemHandle};
30use worterbuch_client::{
31 Err, Key, KeyValuePair, KeyValuePairs, LsState, PState, PStateEvent, ServerMessage as SM,
32 State, StateEvent,
33};
34
35pub async fn next_item<T>(rx: &mut mpsc::Receiver<T>, done: bool) -> Option<T> {
36 if done {
37 sleep(Duration::from_secs(10)).await;
38 None
39 } else {
40 rx.recv().await
41 }
42}
43
44pub fn provide_keys(keys: Option<Vec<String>>, subsys: SubsystemHandle, tx: mpsc::Sender<String>) {
45 if let Some(keys) = keys {
46 spawn(async move {
47 for key in keys {
48 if tx.send(key).await.is_err() {
49 break;
50 }
51 }
52 drop(tx);
53 });
54 } else {
55 subsys.start(SubsystemBuilder::new("read-stdin", |s| async move {
56 let mut lines = BufReader::new(tokio::io::stdin()).lines();
57 loop {
58 select! {
59 _ = s.on_shutdown_requested() => break,
60 recv = lines.next_line() => if let Ok(Some(key)) = recv {
61 if tx.send(key).await.is_err() {
62 break;
63 }
64 } else {
65 break;
66 }
67 }
68 }
69 Ok(()) as anyhow::Result<()>
70 }));
71 }
72}
73
74pub fn provide_values(json: bool, subsys: SubsystemHandle, tx: mpsc::Sender<Value>) {
75 subsys.start(SubsystemBuilder::new("read-stdin", move |s| async move {
76 let mut lines = BufReader::new(tokio::io::stdin()).lines();
77 loop {
78 select! {
79 _ = s.on_shutdown_requested() => break,
80 recv = lines.next_line() => if let Ok(Some(line)) = recv {
81 if json {
82 match serde_json::from_str::<Value>(&line) {
83 Ok(value) => {
84 if tx.send(value).await.is_err() {
85 break;
86 }
87 }
88 Err(e) => {
89 eprintln!("Error parsing json: {e}");
90 }
91 }
92 } else if tx.send(json!(line)).await.is_err() {
93 break;
94 }
95 } else {
96 break;
97 }
98 }
99 }
100 Ok(()) as anyhow::Result<()>
101 }));
102}
103
104pub fn provide_key_value_pairs(
105 key_value_pairs: Option<Vec<String>>,
106 json: bool,
107 subsys: SubsystemHandle,
108 tx: mpsc::Sender<(Key, Value)>,
109) {
110 if let Some(key_value_pairs) = key_value_pairs {
111 spawn(async move {
112 for kvp in key_value_pairs {
113 if let ControlFlow::Break(_) = provide_key_value_pair(json, kvp, &tx).await {
114 break;
115 }
116 }
117 });
118 } else {
119 subsys.start(SubsystemBuilder::new("read-stdin", move|s| async move {
120 let mut lines = BufReader::new(tokio::io::stdin()).lines();
121 loop {
122 select! {
123 _ = s.on_shutdown_requested() => break,
124 recv = lines.next_line() => if let Ok(Some(line)) = recv {
125 if let ControlFlow::Break(_) = provide_key_value_pair(json, line, &tx).await {
126 break;
127 }
128 } else {
129 break;
130 }
131 }
132 }
133 Ok(()) as anyhow::Result<()>
134 }));
135 }
136}
137
138#[derive(Debug, Deserialize)]
139enum Line {
140 #[serde(untagged)]
141 Kvp(KeyValuePair),
142 #[serde(untagged)]
143 Kvps(KeyValuePairs),
144}
145
146async fn provide_key_value_pair(
147 json: bool,
148 line: String,
149 tx: &mpsc::Sender<(String, Value)>,
150) -> ControlFlow<()> {
151 if json {
152 match serde_json::from_str::<Line>(&line) {
153 Ok(Line::Kvp(KeyValuePair { key, value })) => {
154 if tx.send((key, value)).await.is_err() {
155 return ControlFlow::Break(());
156 }
157 }
158 Ok(Line::Kvps(kvps)) => {
159 for KeyValuePair { key, value } in kvps {
160 if tx.send((key, value)).await.is_err() {
161 return ControlFlow::Break(());
162 }
163 }
164 }
165 Err(e) => {
166 eprintln!("Error parsing json: {e}");
167 }
168 }
169 } else if let Some(index) = line.find('=') {
170 let key = line[..index].to_owned();
171 let value = line[index + 1..].to_owned();
172 if tx.send((key, json!(value))).await.is_err() {
173 return ControlFlow::Break(());
174 }
175 } else {
176 eprintln!("no key/value pair (e.g. 'a=b'): {}", line);
177 }
178 ControlFlow::Continue(())
179}
180
181pub fn print_message(msg: &SM, json: bool, raw: bool) {
182 match msg {
183 SM::PState(msg) => print_pstate(msg, json, raw),
184 SM::State(msg) => print_state(msg, json, raw),
185 SM::Err(msg) => print_err(msg, json),
186 SM::LsState(msg) => print_ls(msg, json),
187 _ => (),
188 }
189}
190
191pub fn print_change_event(msg: &SM, json: bool) {
192 match msg {
193 SM::PState(msg) => print_pstate_change(msg, json),
194 SM::State(msg) => print_state_change(msg, json),
195 SM::Err(msg) => print_err(msg, json),
196 _ => (),
197 }
198}
199
200pub fn print_del_event(msg: &SM, json: bool) {
201 match msg {
202 SM::PState(msg) => print_pstate_del(msg, json),
203 SM::State(msg) => print_state_del(msg, json),
204 SM::Err(msg) => print_err(msg, json),
205 _ => (),
206 }
207}
208
209fn print_pstate(msg: &PState, json: bool, raw: bool) {
210 match (json, raw) {
211 (true, true) => print_msg_as_json(&msg.event),
212 (true, false) => print_msg_as_json(msg),
213 (false, true) => match &msg.event {
214 PStateEvent::KeyValuePairs(kvps) => {
215 for kvp in kvps {
216 println!("{kvp}");
217 }
218 }
219 PStateEvent::Deleted(kvps) => {
220 for kvp in kvps {
221 println!("{}={}", kvp.key, Value::Null);
222 }
223 }
224 },
225 (false, false) => println!("{msg}"),
226 }
227}
228
229fn print_state(msg: &State, json: bool, raw: bool) {
230 match (json, raw) {
231 (true, true) => {
232 if let StateEvent::Value(val) = &msg.event {
233 print_msg_as_json(val);
234 } else {
235 print_msg_as_json(Value::Null);
236 }
237 }
238 (true, false) => print_msg_as_json(msg),
239 (false, true) => {
240 if let StateEvent::Value(val) = &msg.event {
241 println!("{}", val);
242 } else {
243 println!("{}", Value::Null);
244 }
245 }
246 (false, false) => println!("{msg}"),
247 }
248}
249
250fn print_ls(msg: &LsState, json: bool) {
251 if json {
252 print_msg_as_json(msg);
253 } else {
254 println!("{msg}");
255 }
256}
257
258fn print_err(msg: &Err, json: bool) {
259 if json {
260 print_msg_as_json(msg);
261 } else {
262 eprintln!("{msg}");
263 }
264}
265
266fn print_msg_as_json(msg: impl Serialize) {
267 match serde_json::to_string(&msg) {
268 Ok(json) => println!("{json}"),
269 Err(e) => {
270 eprintln!("Error converting message to json: {e}");
271 }
272 }
273}
274
275fn print_state_change(msg: &State, json: bool) {
276 if json {
277 if let StateEvent::Value(val) = &msg.event {
278 print_msg_as_json(val);
279 }
280 } else if let StateEvent::Value(val) = &msg.event {
281 println!("{}", val);
282 }
283}
284
285fn print_state_del(msg: &State, json: bool) {
286 if json {
287 if let StateEvent::Deleted(val) = &msg.event {
288 print_msg_as_json(val);
289 }
290 } else if let StateEvent::Deleted(val) = &msg.event {
291 println!("{}", val);
292 }
293}
294
295fn print_pstate_change(msg: &PState, json: bool) {
296 if json {
297 if let PStateEvent::KeyValuePairs(kvps) = &msg.event {
298 print_msg_as_json(kvps);
299 }
300 } else if let PStateEvent::KeyValuePairs(kvps) = &msg.event {
301 for kvp in kvps {
302 println!("{kvp}");
303 }
304 }
305}
306
307fn print_pstate_del(msg: &PState, json: bool) {
308 if json {
309 if let PStateEvent::Deleted(kvps) = &msg.event {
310 print_msg_as_json(kvps);
311 }
312 } else if let PStateEvent::Deleted(kvps) = &msg.event {
313 for kvp in kvps {
314 println!("{kvp}");
315 }
316 }
317}