1use miette::Result;
21use serde::{Deserialize, Serialize};
22use serde_json::{Value, json};
23use std::{
24 io::{BufRead, BufReader},
25 ops::ControlFlow,
26 thread,
27 time::Duration,
28};
29use tokio::{select, spawn, sync::mpsc, time::sleep};
30use tokio_graceful_shutdown::{SubsystemBuilder, SubsystemHandle};
31use tracing::error;
32use worterbuch_client::{
33 Err, Key, KeyValuePair, KeyValuePairs, LsState, PState, PStateEvent, ServerMessage as SM,
34 State, StateEvent,
35};
36
37pub async fn next_item<T>(rx: &mut mpsc::Receiver<T>, done: bool) -> Option<T> {
38 if done {
39 sleep(Duration::from_secs(10)).await;
40 None
41 } else {
42 rx.recv().await
43 }
44}
45
46pub fn provide_keys(
47 keys: Option<Vec<String>>,
48 subsys: &mut SubsystemHandle,
49 tx: mpsc::Sender<String>,
50) {
51 if let Some(keys) = keys {
52 spawn(async move {
53 for key in keys {
54 if tx.send(key).await.is_err() {
55 break;
56 }
57 }
58 drop(tx);
59 });
60 } else {
61 subsys.start(SubsystemBuilder::new(
62 "read-stdin",
63 async move |s: &mut SubsystemHandle| {
64 let (lines_tx, mut lines_rx) = mpsc::channel(1);
65 thread::spawn(move || {
66 let mut lines = BufReader::new(std::io::stdin()).lines();
67 while let Some(Ok(line)) = lines.next() {
68 if let Err(e) = lines_tx.blocking_send(line) {
69 error!("Could not forward line from stdin: {e}");
70 }
71 }
72 });
73 loop {
74 select! {
75 _ = s.on_shutdown_requested() => break,
76 recv = lines_rx.recv() => if let Some(key) = recv {
77 if tx.send(key).await.is_err() {
78 break;
79 }
80 } else {
81 break;
82 }
83 }
84 }
85 Ok(()) as Result<()>
86 },
87 ));
88 }
89}
90
91pub fn provide_values(json: bool, subsys: &mut SubsystemHandle, tx: mpsc::Sender<Value>) {
92 subsys.start(SubsystemBuilder::new(
93 "read-stdin",
94 async move |s: &mut SubsystemHandle| {
95 let (lines_tx, mut lines_rx) = mpsc::channel(1);
96 thread::spawn(move || {
97 let mut lines = BufReader::new(std::io::stdin()).lines();
98 while let Some(Ok(line)) = lines.next() {
99 if let Err(e) = lines_tx.blocking_send(line) {
100 error!("Could not forward line from stdin: {e}");
101 }
102 }
103 });
104 loop {
105 select! {
106 _ = s.on_shutdown_requested() => break,
107 recv = lines_rx.recv() => if let Some(line) = recv {
108 if json {
109 match serde_json::from_str::<Value>(&line) {
110 Ok(value) => {
111 if tx.send(value).await.is_err() {
112 break;
113 }
114 }
115 Err(e) => {
116 eprintln!("Error parsing json: {e}");
117 }
118 }
119 } else if tx.send(json!(line)).await.is_err() {
120 break;
121 }
122 } else {
123 break;
124 }
125 }
126 }
127 Ok(()) as Result<()>
128 },
129 ));
130}
131
132pub fn provide_key_value_pairs(
133 key_value_pairs: Option<Vec<String>>,
134 json: bool,
135 subsys: &mut SubsystemHandle,
136 tx: mpsc::Sender<(Key, Value)>,
137) {
138 if let Some(key_value_pairs) = key_value_pairs {
139 spawn(async move {
140 for kvp in key_value_pairs {
141 if let ControlFlow::Break(_) = provide_key_value_pair(json, kvp, &tx).await {
142 break;
143 }
144 }
145 });
146 } else {
147 let (lines_tx, mut lines_rx) = mpsc::channel(1);
148 thread::spawn(move || {
149 let mut lines = BufReader::new(std::io::stdin()).lines();
150 while let Some(Ok(line)) = lines.next() {
151 if let Err(e) = lines_tx.blocking_send(line) {
152 error!("Could not forward line from stdin: {e}");
153 }
154 }
155 });
156 subsys.start(SubsystemBuilder::new("read-stdin", async move |s: &mut SubsystemHandle| {
157 loop {
158 select! {
159 _ = s.on_shutdown_requested() => break,
160 recv = lines_rx.recv() => if let Some(line) = recv {
161 if let ControlFlow::Break(_) = provide_key_value_pair(json, line, &tx).await {
162 break;
163 }
164 } else {
165 break;
166 }
167 }
168 }
169 Ok(()) as Result<()>
170 }));
171 }
172}
173
174#[derive(Debug, Deserialize)]
175enum Line {
176 #[serde(untagged)]
177 Kvp(KeyValuePair),
178 #[serde(untagged)]
179 Kvps(KeyValuePairs),
180}
181
182async fn provide_key_value_pair(
183 json: bool,
184 line: String,
185 tx: &mpsc::Sender<(String, Value)>,
186) -> ControlFlow<()> {
187 if json {
188 match serde_json::from_str::<Line>(&line) {
189 Ok(Line::Kvp(KeyValuePair { key, value })) => {
190 if tx.send((key, value)).await.is_err() {
191 return ControlFlow::Break(());
192 }
193 }
194 Ok(Line::Kvps(kvps)) => {
195 for KeyValuePair { key, value } in kvps {
196 if tx.send((key, value)).await.is_err() {
197 return ControlFlow::Break(());
198 }
199 }
200 }
201 Err(e) => {
202 eprintln!("Error parsing json: {e}");
203 }
204 }
205 } else if let Some(index) = line.find('=') {
206 let key = line[..index].to_owned();
207 let value = line[index + 1..].to_owned();
208 if tx.send((key, json!(value))).await.is_err() {
209 return ControlFlow::Break(());
210 }
211 } else {
212 eprintln!("no key/value pair (e.g. 'a=b'): {line}");
213 }
214 ControlFlow::Continue(())
215}
216
217pub fn print_message(msg: &SM, json: bool, raw: bool) {
218 match msg {
219 SM::PState(msg) => print_pstate(msg, json, raw),
220 SM::State(msg) => print_state(msg, json, raw),
221 SM::Err(msg) => print_err(msg, json),
222 SM::LsState(msg) => print_ls(msg, json),
223 _ => (),
224 }
225}
226
227pub fn print_change_event(msg: &SM, json: bool) {
228 match msg {
229 SM::PState(msg) => print_pstate_change(msg, json),
230 SM::State(msg) => print_state_change(msg, json),
231 SM::Err(msg) => print_err(msg, json),
232 _ => (),
233 }
234}
235
236pub fn print_del_event(msg: &SM, json: bool) {
237 match msg {
238 SM::PState(msg) => print_pstate_del(msg, json),
239 SM::State(msg) => print_state_del(msg, json),
240 SM::Err(msg) => print_err(msg, json),
241 _ => (),
242 }
243}
244
245fn print_pstate(msg: &PState, json: bool, raw: bool) {
246 match (json, raw) {
247 (true, true) => print_msg_as_json(&msg.event),
248 (true, false) => print_msg_as_json(msg),
249 (false, true) => match &msg.event {
250 PStateEvent::KeyValuePairs(kvps) => {
251 for kvp in kvps {
252 println!("{kvp}");
253 }
254 }
255 PStateEvent::Deleted(kvps) => {
256 for kvp in kvps {
257 println!("{}={}", kvp.key, Value::Null);
258 }
259 }
260 },
261 (false, false) => println!("{msg}"),
262 }
263}
264
265fn print_state(msg: &State, json: bool, raw: bool) {
266 match (json, raw) {
267 (true, true) => {
268 if let StateEvent::Value(val) = &msg.event {
269 print_msg_as_json(val);
270 } else {
271 print_msg_as_json(Value::Null);
272 }
273 }
274 (true, false) => print_msg_as_json(msg),
275 (false, true) => {
276 if let StateEvent::Value(val) = &msg.event {
277 println!("{val}");
278 } else {
279 println!("{}", Value::Null);
280 }
281 }
282 (false, false) => println!("{msg}"),
283 }
284}
285
286fn print_ls(msg: &LsState, json: bool) {
287 if json {
288 print_msg_as_json(msg);
289 } else {
290 println!("{msg}");
291 }
292}
293
294fn print_err(msg: &Err, json: bool) {
295 if json {
296 print_msg_as_json(msg);
297 } else {
298 eprintln!("{msg}");
299 }
300}
301
302fn print_msg_as_json(msg: impl Serialize) {
303 match serde_json::to_string(&msg) {
304 Ok(json) => println!("{json}"),
305 Err(e) => {
306 eprintln!("Error converting message to json: {e}");
307 }
308 }
309}
310
311fn print_state_change(msg: &State, json: bool) {
312 if json {
313 if let StateEvent::Value(val) = &msg.event {
314 print_msg_as_json(val);
315 }
316 } else if let StateEvent::Value(val) = &msg.event {
317 println!("{val}");
318 }
319}
320
321fn print_state_del(msg: &State, json: bool) {
322 if json {
323 if let StateEvent::Deleted(val) = &msg.event {
324 print_msg_as_json(val);
325 }
326 } else if let StateEvent::Deleted(val) = &msg.event {
327 println!("{val}");
328 }
329}
330
331fn print_pstate_change(msg: &PState, json: bool) {
332 if json {
333 if let PStateEvent::KeyValuePairs(kvps) = &msg.event {
334 print_msg_as_json(kvps);
335 }
336 } else if let PStateEvent::KeyValuePairs(kvps) = &msg.event {
337 for kvp in kvps {
338 println!("{kvp}");
339 }
340 }
341}
342
343fn print_pstate_del(msg: &PState, json: bool) {
344 if json {
345 if let PStateEvent::Deleted(kvps) = &msg.event {
346 print_msg_as_json(kvps);
347 }
348 } else if let PStateEvent::Deleted(kvps) = &msg.event {
349 for kvp in kvps {
350 println!("{kvp}");
351 }
352 }
353}