1use miette::Result;
21use serde::{Deserialize, Serialize};
22use serde_json::{Value, json};
23use std::{
24 io::{BufRead, BufReader},
25 ops::ControlFlow,
26 thread,
27 time::Duration,
28};
29use tokio::{select, spawn, sync::mpsc, time::sleep};
30use tokio_graceful_shutdown::{SubsystemBuilder, SubsystemHandle};
31use tracing::error;
32use worterbuch_client::{
33 Err, Key, KeyValuePair, KeyValuePairs, LsState, PState, PStateEvent, ServerMessage as SM,
34 State, StateEvent,
35};
36
37pub async fn next_item<T>(rx: &mut mpsc::Receiver<T>, done: bool) -> Option<T> {
38 if done {
39 sleep(Duration::from_secs(10)).await;
40 None
41 } else {
42 rx.recv().await
43 }
44}
45
46pub fn provide_keys(keys: Option<Vec<String>>, subsys: SubsystemHandle, tx: mpsc::Sender<String>) {
47 if let Some(keys) = keys {
48 spawn(async move {
49 for key in keys {
50 if tx.send(key).await.is_err() {
51 break;
52 }
53 }
54 drop(tx);
55 });
56 } else {
57 subsys.start(SubsystemBuilder::new("read-stdin", |s| async move {
58 let (lines_tx, mut lines_rx) = mpsc::channel(1);
59 thread::spawn(move || {
60 let mut lines = BufReader::new(std::io::stdin()).lines();
61 while let Some(Ok(line)) = lines.next() {
62 if let Err(e) = lines_tx.blocking_send(line) {
63 error!("Could not forward line from stdin: {e}");
64 }
65 }
66 });
67 loop {
68 select! {
69 _ = s.on_shutdown_requested() => break,
70 recv = lines_rx.recv() => if let Some(key) = recv {
71 if tx.send(key).await.is_err() {
72 break;
73 }
74 } else {
75 break;
76 }
77 }
78 }
79 Ok(()) as Result<()>
80 }));
81 }
82}
83
84pub fn provide_values(json: bool, subsys: SubsystemHandle, tx: mpsc::Sender<Value>) {
85 subsys.start(SubsystemBuilder::new("read-stdin", move |s| async move {
86 let (lines_tx, mut lines_rx) = mpsc::channel(1);
87 thread::spawn(move || {
88 let mut lines = BufReader::new(std::io::stdin()).lines();
89 while let Some(Ok(line)) = lines.next() {
90 if let Err(e) = lines_tx.blocking_send(line) {
91 error!("Could not forward line from stdin: {e}");
92 }
93 }
94 });
95 loop {
96 select! {
97 _ = s.on_shutdown_requested() => break,
98 recv = lines_rx.recv() => if let Some(line) = recv {
99 if json {
100 match serde_json::from_str::<Value>(&line) {
101 Ok(value) => {
102 if tx.send(value).await.is_err() {
103 break;
104 }
105 }
106 Err(e) => {
107 eprintln!("Error parsing json: {e}");
108 }
109 }
110 } else if tx.send(json!(line)).await.is_err() {
111 break;
112 }
113 } else {
114 break;
115 }
116 }
117 }
118 Ok(()) as Result<()>
119 }));
120}
121
122pub fn provide_key_value_pairs(
123 key_value_pairs: Option<Vec<String>>,
124 json: bool,
125 subsys: SubsystemHandle,
126 tx: mpsc::Sender<(Key, Value)>,
127) {
128 if let Some(key_value_pairs) = key_value_pairs {
129 spawn(async move {
130 for kvp in key_value_pairs {
131 if let ControlFlow::Break(_) = provide_key_value_pair(json, kvp, &tx).await {
132 break;
133 }
134 }
135 });
136 } else {
137 let (lines_tx, mut lines_rx) = mpsc::channel(1);
138 thread::spawn(move || {
139 let mut lines = BufReader::new(std::io::stdin()).lines();
140 while let Some(Ok(line)) = lines.next() {
141 if let Err(e) = lines_tx.blocking_send(line) {
142 error!("Could not forward line from stdin: {e}");
143 }
144 }
145 });
146 subsys.start(SubsystemBuilder::new("read-stdin", move|s| async move {
147 loop {
148 select! {
149 _ = s.on_shutdown_requested() => break,
150 recv = lines_rx.recv() => if let Some(line) = recv {
151 if let ControlFlow::Break(_) = provide_key_value_pair(json, line, &tx).await {
152 break;
153 }
154 } else {
155 break;
156 }
157 }
158 }
159 Ok(()) as Result<()>
160 }));
161 }
162}
163
164#[derive(Debug, Deserialize)]
165enum Line {
166 #[serde(untagged)]
167 Kvp(KeyValuePair),
168 #[serde(untagged)]
169 Kvps(KeyValuePairs),
170}
171
172async fn provide_key_value_pair(
173 json: bool,
174 line: String,
175 tx: &mpsc::Sender<(String, Value)>,
176) -> ControlFlow<()> {
177 if json {
178 match serde_json::from_str::<Line>(&line) {
179 Ok(Line::Kvp(KeyValuePair { key, value })) => {
180 if tx.send((key, value)).await.is_err() {
181 return ControlFlow::Break(());
182 }
183 }
184 Ok(Line::Kvps(kvps)) => {
185 for KeyValuePair { key, value } in kvps {
186 if tx.send((key, value)).await.is_err() {
187 return ControlFlow::Break(());
188 }
189 }
190 }
191 Err(e) => {
192 eprintln!("Error parsing json: {e}");
193 }
194 }
195 } else if let Some(index) = line.find('=') {
196 let key = line[..index].to_owned();
197 let value = line[index + 1..].to_owned();
198 if tx.send((key, json!(value))).await.is_err() {
199 return ControlFlow::Break(());
200 }
201 } else {
202 eprintln!("no key/value pair (e.g. 'a=b'): {}", line);
203 }
204 ControlFlow::Continue(())
205}
206
207pub fn print_message(msg: &SM, json: bool, raw: bool) {
208 match msg {
209 SM::PState(msg) => print_pstate(msg, json, raw),
210 SM::State(msg) => print_state(msg, json, raw),
211 SM::Err(msg) => print_err(msg, json),
212 SM::LsState(msg) => print_ls(msg, json),
213 _ => (),
214 }
215}
216
217pub fn print_change_event(msg: &SM, json: bool) {
218 match msg {
219 SM::PState(msg) => print_pstate_change(msg, json),
220 SM::State(msg) => print_state_change(msg, json),
221 SM::Err(msg) => print_err(msg, json),
222 _ => (),
223 }
224}
225
226pub fn print_del_event(msg: &SM, json: bool) {
227 match msg {
228 SM::PState(msg) => print_pstate_del(msg, json),
229 SM::State(msg) => print_state_del(msg, json),
230 SM::Err(msg) => print_err(msg, json),
231 _ => (),
232 }
233}
234
235fn print_pstate(msg: &PState, json: bool, raw: bool) {
236 match (json, raw) {
237 (true, true) => print_msg_as_json(&msg.event),
238 (true, false) => print_msg_as_json(msg),
239 (false, true) => match &msg.event {
240 PStateEvent::KeyValuePairs(kvps) => {
241 for kvp in kvps {
242 println!("{kvp}");
243 }
244 }
245 PStateEvent::Deleted(kvps) => {
246 for kvp in kvps {
247 println!("{}={}", kvp.key, Value::Null);
248 }
249 }
250 },
251 (false, false) => println!("{msg}"),
252 }
253}
254
255fn print_state(msg: &State, json: bool, raw: bool) {
256 match (json, raw) {
257 (true, true) => {
258 if let StateEvent::Value(val) = &msg.event {
259 print_msg_as_json(val);
260 } else {
261 print_msg_as_json(Value::Null);
262 }
263 }
264 (true, false) => print_msg_as_json(msg),
265 (false, true) => {
266 if let StateEvent::Value(val) = &msg.event {
267 println!("{}", val);
268 } else {
269 println!("{}", Value::Null);
270 }
271 }
272 (false, false) => println!("{msg}"),
273 }
274}
275
276fn print_ls(msg: &LsState, json: bool) {
277 if json {
278 print_msg_as_json(msg);
279 } else {
280 println!("{msg}");
281 }
282}
283
284fn print_err(msg: &Err, json: bool) {
285 if json {
286 print_msg_as_json(msg);
287 } else {
288 eprintln!("{msg}");
289 }
290}
291
292fn print_msg_as_json(msg: impl Serialize) {
293 match serde_json::to_string(&msg) {
294 Ok(json) => println!("{json}"),
295 Err(e) => {
296 eprintln!("Error converting message to json: {e}");
297 }
298 }
299}
300
301fn print_state_change(msg: &State, json: bool) {
302 if json {
303 if let StateEvent::Value(val) = &msg.event {
304 print_msg_as_json(val);
305 }
306 } else if let StateEvent::Value(val) = &msg.event {
307 println!("{}", val);
308 }
309}
310
311fn print_state_del(msg: &State, json: bool) {
312 if json {
313 if let StateEvent::Deleted(val) = &msg.event {
314 print_msg_as_json(val);
315 }
316 } else if let StateEvent::Deleted(val) = &msg.event {
317 println!("{}", val);
318 }
319}
320
321fn print_pstate_change(msg: &PState, json: bool) {
322 if json {
323 if let PStateEvent::KeyValuePairs(kvps) = &msg.event {
324 print_msg_as_json(kvps);
325 }
326 } else if let PStateEvent::KeyValuePairs(kvps) = &msg.event {
327 for kvp in kvps {
328 println!("{kvp}");
329 }
330 }
331}
332
333fn print_pstate_del(msg: &PState, json: bool) {
334 if json {
335 if let PStateEvent::Deleted(kvps) = &msg.event {
336 print_msg_as_json(kvps);
337 }
338 } else if let PStateEvent::Deleted(kvps) = &msg.event {
339 for kvp in kvps {
340 println!("{kvp}");
341 }
342 }
343}