1use serde::{Deserialize, Serialize};
21use serde_json::{json, Value};
22use std::{
23 io::{BufRead, BufReader},
24 ops::ControlFlow,
25 thread,
26 time::Duration,
27};
28use tokio::{select, spawn, sync::mpsc, time::sleep};
29use tokio_graceful_shutdown::{SubsystemBuilder, SubsystemHandle};
30use worterbuch_client::{
31 Err, Key, KeyValuePair, KeyValuePairs, LsState, PState, PStateEvent, ServerMessage as SM,
32 State, StateEvent,
33};
34
35pub async fn next_item<T>(rx: &mut mpsc::Receiver<T>, done: bool) -> Option<T> {
36 if done {
37 sleep(Duration::from_secs(10)).await;
38 None
39 } else {
40 rx.recv().await
41 }
42}
43
44pub fn provide_keys(keys: Option<Vec<String>>, subsys: SubsystemHandle, tx: mpsc::Sender<String>) {
45 if let Some(keys) = keys {
46 spawn(async move {
47 for key in keys {
48 if tx.send(key).await.is_err() {
49 break;
50 }
51 }
52 drop(tx);
53 });
54 } else {
55 subsys.start(SubsystemBuilder::new("read-stdin", |s| async move {
56 let (lines_tx, mut lines_rx) = mpsc::channel(1);
57 thread::spawn(move || {
58 let mut lines = BufReader::new(std::io::stdin()).lines();
59 while let Some(Ok(line)) = lines.next() {
60 if let Err(e) = lines_tx.blocking_send(line) {
61 log::error!("Could not forward line from stdin: {e}");
62 }
63 }
64 });
65 loop {
66 select! {
67 _ = s.on_shutdown_requested() => break,
68 recv = lines_rx.recv() => if let Some(key) = recv {
69 if tx.send(key).await.is_err() {
70 break;
71 }
72 } else {
73 break;
74 }
75 }
76 }
77 Ok(()) as anyhow::Result<()>
78 }));
79 }
80}
81
82pub fn provide_values(json: bool, subsys: SubsystemHandle, tx: mpsc::Sender<Value>) {
83 subsys.start(SubsystemBuilder::new("read-stdin", move |s| async move {
84 let (lines_tx, mut lines_rx) = mpsc::channel(1);
85 thread::spawn(move || {
86 let mut lines = BufReader::new(std::io::stdin()).lines();
87 while let Some(Ok(line)) = lines.next() {
88 if let Err(e) = lines_tx.blocking_send(line) {
89 log::error!("Could not forward line from stdin: {e}");
90 }
91 }
92 });
93 loop {
94 select! {
95 _ = s.on_shutdown_requested() => break,
96 recv = lines_rx.recv() => if let Some(line) = recv {
97 if json {
98 match serde_json::from_str::<Value>(&line) {
99 Ok(value) => {
100 if tx.send(value).await.is_err() {
101 break;
102 }
103 }
104 Err(e) => {
105 eprintln!("Error parsing json: {e}");
106 }
107 }
108 } else if tx.send(json!(line)).await.is_err() {
109 break;
110 }
111 } else {
112 break;
113 }
114 }
115 }
116 Ok(()) as anyhow::Result<()>
117 }));
118}
119
120pub fn provide_key_value_pairs(
121 key_value_pairs: Option<Vec<String>>,
122 json: bool,
123 subsys: SubsystemHandle,
124 tx: mpsc::Sender<(Key, Value)>,
125) {
126 if let Some(key_value_pairs) = key_value_pairs {
127 spawn(async move {
128 for kvp in key_value_pairs {
129 if let ControlFlow::Break(_) = provide_key_value_pair(json, kvp, &tx).await {
130 break;
131 }
132 }
133 });
134 } else {
135 let (lines_tx, mut lines_rx) = mpsc::channel(1);
136 thread::spawn(move || {
137 let mut lines = BufReader::new(std::io::stdin()).lines();
138 while let Some(Ok(line)) = lines.next() {
139 if let Err(e) = lines_tx.blocking_send(line) {
140 log::error!("Could not forward line from stdin: {e}");
141 }
142 }
143 });
144 subsys.start(SubsystemBuilder::new("read-stdin", move|s| async move {
145 loop {
146 select! {
147 _ = s.on_shutdown_requested() => break,
148 recv = lines_rx.recv() => if let Some(line) = recv {
149 if let ControlFlow::Break(_) = provide_key_value_pair(json, line, &tx).await {
150 break;
151 }
152 } else {
153 break;
154 }
155 }
156 }
157 Ok(()) as anyhow::Result<()>
158 }));
159 }
160}
161
162#[derive(Debug, Deserialize)]
163enum Line {
164 #[serde(untagged)]
165 Kvp(KeyValuePair),
166 #[serde(untagged)]
167 Kvps(KeyValuePairs),
168}
169
170async fn provide_key_value_pair(
171 json: bool,
172 line: String,
173 tx: &mpsc::Sender<(String, Value)>,
174) -> ControlFlow<()> {
175 if json {
176 match serde_json::from_str::<Line>(&line) {
177 Ok(Line::Kvp(KeyValuePair { key, value })) => {
178 if tx.send((key, value)).await.is_err() {
179 return ControlFlow::Break(());
180 }
181 }
182 Ok(Line::Kvps(kvps)) => {
183 for KeyValuePair { key, value } in kvps {
184 if tx.send((key, value)).await.is_err() {
185 return ControlFlow::Break(());
186 }
187 }
188 }
189 Err(e) => {
190 eprintln!("Error parsing json: {e}");
191 }
192 }
193 } else if let Some(index) = line.find('=') {
194 let key = line[..index].to_owned();
195 let value = line[index + 1..].to_owned();
196 if tx.send((key, json!(value))).await.is_err() {
197 return ControlFlow::Break(());
198 }
199 } else {
200 eprintln!("no key/value pair (e.g. 'a=b'): {}", line);
201 }
202 ControlFlow::Continue(())
203}
204
205pub fn print_message(msg: &SM, json: bool, raw: bool) {
206 match msg {
207 SM::PState(msg) => print_pstate(msg, json, raw),
208 SM::State(msg) => print_state(msg, json, raw),
209 SM::Err(msg) => print_err(msg, json),
210 SM::LsState(msg) => print_ls(msg, json),
211 _ => (),
212 }
213}
214
215pub fn print_change_event(msg: &SM, json: bool) {
216 match msg {
217 SM::PState(msg) => print_pstate_change(msg, json),
218 SM::State(msg) => print_state_change(msg, json),
219 SM::Err(msg) => print_err(msg, json),
220 _ => (),
221 }
222}
223
224pub fn print_del_event(msg: &SM, json: bool) {
225 match msg {
226 SM::PState(msg) => print_pstate_del(msg, json),
227 SM::State(msg) => print_state_del(msg, json),
228 SM::Err(msg) => print_err(msg, json),
229 _ => (),
230 }
231}
232
233fn print_pstate(msg: &PState, json: bool, raw: bool) {
234 match (json, raw) {
235 (true, true) => print_msg_as_json(&msg.event),
236 (true, false) => print_msg_as_json(msg),
237 (false, true) => match &msg.event {
238 PStateEvent::KeyValuePairs(kvps) => {
239 for kvp in kvps {
240 println!("{kvp}");
241 }
242 }
243 PStateEvent::Deleted(kvps) => {
244 for kvp in kvps {
245 println!("{}={}", kvp.key, Value::Null);
246 }
247 }
248 },
249 (false, false) => println!("{msg}"),
250 }
251}
252
253fn print_state(msg: &State, json: bool, raw: bool) {
254 match (json, raw) {
255 (true, true) => {
256 if let StateEvent::Value(val) = &msg.event {
257 print_msg_as_json(val);
258 } else {
259 print_msg_as_json(Value::Null);
260 }
261 }
262 (true, false) => print_msg_as_json(msg),
263 (false, true) => {
264 if let StateEvent::Value(val) = &msg.event {
265 println!("{}", val);
266 } else {
267 println!("{}", Value::Null);
268 }
269 }
270 (false, false) => println!("{msg}"),
271 }
272}
273
274fn print_ls(msg: &LsState, json: bool) {
275 if json {
276 print_msg_as_json(msg);
277 } else {
278 println!("{msg}");
279 }
280}
281
282fn print_err(msg: &Err, json: bool) {
283 if json {
284 print_msg_as_json(msg);
285 } else {
286 eprintln!("{msg}");
287 }
288}
289
290fn print_msg_as_json(msg: impl Serialize) {
291 match serde_json::to_string(&msg) {
292 Ok(json) => println!("{json}"),
293 Err(e) => {
294 eprintln!("Error converting message to json: {e}");
295 }
296 }
297}
298
299fn print_state_change(msg: &State, json: bool) {
300 if json {
301 if let StateEvent::Value(val) = &msg.event {
302 print_msg_as_json(val);
303 }
304 } else if let StateEvent::Value(val) = &msg.event {
305 println!("{}", val);
306 }
307}
308
309fn print_state_del(msg: &State, json: bool) {
310 if json {
311 if let StateEvent::Deleted(val) = &msg.event {
312 print_msg_as_json(val);
313 }
314 } else if let StateEvent::Deleted(val) = &msg.event {
315 println!("{}", val);
316 }
317}
318
319fn print_pstate_change(msg: &PState, json: bool) {
320 if json {
321 if let PStateEvent::KeyValuePairs(kvps) = &msg.event {
322 print_msg_as_json(kvps);
323 }
324 } else if let PStateEvent::KeyValuePairs(kvps) = &msg.event {
325 for kvp in kvps {
326 println!("{kvp}");
327 }
328 }
329}
330
331fn print_pstate_del(msg: &PState, json: bool) {
332 if json {
333 if let PStateEvent::Deleted(kvps) = &msg.event {
334 print_msg_as_json(kvps);
335 }
336 } else if let PStateEvent::Deleted(kvps) = &msg.event {
337 for kvp in kvps {
338 println!("{kvp}");
339 }
340 }
341}