1use serde_json::Value;
2use tokio::net::TcpStream;
3use tokio::io::{AsyncWriteExt, AsyncBufReadExt, BufReader};
4use tokio::sync::mpsc;
5use tokio::time::{sleep, Duration};
6use tokio::task::JoinHandle;
7
8#[derive(Debug, Clone, PartialEq)]
10pub enum Command {
11 RoomCreate,
12 RoomDelete(u64),
13 RoomList,
14 RoomInfo(u64),
15 RoomLabel(u64, String),
16 RoomFind(String),
17 Set { room_id: u64, container: String, key: String, value: Value },
18 Del { room_id: u64, container: String, key: String },
19 Get { room_id: u64, container: String, key: String },
20 ServerSet { room_id: u64, key: String, value: Value },
22 ServerDel { room_id: u64, key: String },
23 ServerGet { room_id: u64, key: String },
24 Version(u64),
25 SetJwtKey(String),
26 TxBegin(u64),
27 TxEnd(u64),
28 TxAbort(u64),
29 TokenGen { room_id: u64, containers: Vec<String> },
30 Save { room_id: u64 },
31 Load { room_id: u64 },
32 PersistSet { room_id: u64, container: String, key: String },
33 PersistUnset { room_id: u64, container: String, key: String },
34 PersistGet { room_id: u64, container: String, key: String },
35}
36
37#[derive(Debug, Clone, PartialEq)]
39pub enum Response {
40 Ok(Option<String>),
41 Error(String),
42}
43
44pub fn parse_command(line: &str) -> Result<Command, String> {
47 let mut remainder = line.trim_start();
48
49 let (cmd, rest) = match take_token(remainder) {
50 Ok((token, rest)) => (token, rest),
51 Err(e) => return Err(e),
52 };
53 remainder = rest;
54
55 match cmd.as_ref() {
56 "ROOM.CREATE" => {
57 if !remainder.trim().is_empty() {
58 return Err("ERROR extra_arguments".into());
59 }
60 Ok(Command::RoomCreate)
61 }
62 "ROOM.DELETE" => {
63 let (room_id, rest) = parse_room_id_from_remainder(remainder)?;
64 if !rest.trim().is_empty() {
65 return Err("ERROR extra_arguments".into());
66 }
67 Ok(Command::RoomDelete(room_id))
68 }
69 "ROOM.LIST" => {
70 if !remainder.trim().is_empty() {
71 return Err("ERROR extra_arguments".into());
72 }
73 Ok(Command::RoomList)
74 }
75 "ROOM.INFO" => {
76 let (room_id, rest) = parse_room_id_from_remainder(remainder)?;
77 if !rest.trim().is_empty() {
78 return Err("ERROR extra_arguments".into());
79 }
80 Ok(Command::RoomInfo(room_id))
81 }
82 "ROOM.LABEL" => {
83 let (room_id, rest) = parse_room_id_from_remainder(remainder)?;
84 let (label, rest) = take_token(rest)?;
85 if label.trim().is_empty() {
86 return Err("ERROR label_invalid".into());
87 }
88 if !rest.trim().is_empty() {
89 return Err("ERROR extra_arguments".into());
90 }
91 Ok(Command::RoomLabel(room_id, label))
92 }
93 "ROOM.FIND" => {
94 let (label, rest) = take_token(remainder)?;
95 if !rest.trim().is_empty() {
96 return Err("ERROR extra_arguments".into());
97 }
98 Ok(Command::RoomFind(label))
99 }
100 "SET" => {
101 let (room_id, rest) = parse_room_id_from_remainder(remainder)?;
102 let (container, rest) = take_token(rest)?;
103 let (key, rest) = take_token(rest)?;
104 let value_json = rest.trim_start();
105 if value_json.is_empty() {
106 return Err("ERROR missing_value".into());
107 }
108 let value: Value = match serde_json::from_str(value_json) {
109 Ok(v) => v,
110 Err(err) => return Err(format!("ERROR invalid_json {}", err)),
111 };
112 Ok(Command::Set { room_id, container, key, value })
113 }
114 "DEL" => {
115 let (room_id, rest) = parse_room_id_from_remainder(remainder)?;
116 let (container, rest) = take_token(rest)?;
117 let (key, rest) = take_token(rest)?;
118 if !rest.trim().is_empty() {
119 return Err("ERROR extra_arguments".into());
120 }
121 Ok(Command::Del { room_id, container, key })
122 }
123 "GET" => {
124 let (room_id, rest) = parse_room_id_from_remainder(remainder)?;
125 let (container, rest) = take_token(rest)?;
126 let (key, rest) = take_token(rest)?;
127 if !rest.trim().is_empty() {
128 return Err("ERROR extra_arguments".into());
129 }
130 Ok(Command::Get { room_id, container, key })
131 }
132 "SERVER.SET" => {
133 let (room_id, rest) = parse_room_id_from_remainder(remainder)?;
134 let (key, rest) = take_token(rest)?;
135 let value_json = rest.trim_start();
136 if value_json.is_empty() {
137 return Err("ERROR missing_value".into());
138 }
139 let value: Value = match serde_json::from_str(value_json) {
140 Ok(v) => v,
141 Err(err) => return Err(format!("ERROR invalid_json {}", err)),
142 };
143 Ok(Command::ServerSet { room_id, key, value })
144 }
145 "SERVER.DEL" => {
146 let (room_id, rest) = parse_room_id_from_remainder(remainder)?;
147 let (key, rest) = take_token(rest)?;
148 if !rest.trim().is_empty() {
149 return Err("ERROR extra_arguments".into());
150 }
151 Ok(Command::ServerDel { room_id, key })
152 }
153 "SERVER.GET" => {
154 let (room_id, rest) = parse_room_id_from_remainder(remainder)?;
155 let (key, rest) = take_token(rest)?;
156 if !rest.trim().is_empty() {
157 return Err("ERROR extra_arguments".into());
158 }
159 Ok(Command::ServerGet { room_id, key })
160 }
161 "VERSION" => {
162 let (room_id, rest) = parse_room_id_from_remainder(remainder)?;
163 if !rest.trim().is_empty() {
164 return Err("ERROR extra_arguments".into());
165 }
166 Ok(Command::Version(room_id))
167 }
168 "SET.JWTKEY" => {
169 let (key, rest) = take_token(remainder)?;
170 if !rest.trim().is_empty() {
171 return Err("ERROR extra_arguments".into());
172 }
173 Ok(Command::SetJwtKey(key))
174 }
175 "TX.BEGIN" => {
176 let (room_id, rest) = parse_room_id_from_remainder(remainder)?;
177 if !rest.trim().is_empty() {
178 return Err("ERROR extra_arguments".into());
179 }
180 Ok(Command::TxBegin(room_id))
181 }
182 "TX.END" => {
183 let (room_id, rest) = parse_room_id_from_remainder(remainder)?;
184 if !rest.trim().is_empty() {
185 return Err("ERROR extra_arguments".into());
186 }
187 Ok(Command::TxEnd(room_id))
188 }
189 "TX.ABORT" => {
190 let (room_id, rest) = parse_room_id_from_remainder(remainder)?;
191 if !rest.trim().is_empty() {
192 return Err("ERROR extra_arguments".into());
193 }
194 Ok(Command::TxAbort(room_id))
195 }
196 "TOKEN.GEN" => {
197 let (room_id_str, rest) = take_token(remainder)?;
198 let room_id = match room_id_str.parse::<u64>() {
199 Ok(id) => id,
200 Err(_) => return Err("ERROR invalid_room_id".into()),
201 };
202 let mut containers = Vec::new();
203 let mut leftover = rest;
204 while !leftover.trim().is_empty() {
205 let (tok, rem) = take_token(leftover)?;
206 containers.push(tok);
207 leftover = rem;
208 }
209 Ok(Command::TokenGen { room_id, containers })
210 }
211 "SAVE" => {
212 let (room_id, rest) = parse_room_id_from_remainder(remainder)?;
213 if !rest.trim().is_empty() {
214 return Err("ERROR extra_arguments".into());
215 }
216 Ok(Command::Save { room_id })
217 }
218 "LOAD" => {
219 let (room_id, rest) = parse_room_id_from_remainder(remainder)?;
220 if !rest.trim().is_empty() {
221 return Err("ERROR extra_arguments".into());
222 }
223 Ok(Command::Load { room_id })
224 }
225 "PERSIST.SET" => {
226 let (room_id, rest) = parse_room_id_from_remainder(remainder)?;
227 let (container, rest) = take_token(rest)?;
228 let (key, rest) = take_token(rest)?;
229 if !rest.trim().is_empty() {
230 return Err("ERROR extra_arguments".into());
231 }
232 Ok(Command::PersistSet { room_id, container, key })
233 }
234 "PERSIST.UNSET" => {
235 let (room_id, rest) = parse_room_id_from_remainder(remainder)?;
236 let (container, rest) = take_token(rest)?;
237 let (key, rest) = take_token(rest)?;
238 if !rest.trim().is_empty() {
239 return Err("ERROR extra_arguments".into());
240 }
241 Ok(Command::PersistUnset { room_id, container, key })
242 }
243 "PERSIST.GET" => {
244 let (room_id, rest) = parse_room_id_from_remainder(remainder)?;
245 let (container, rest) = take_token(rest)?;
246 let (key, rest) = take_token(rest)?;
247 if !rest.trim().is_empty() {
248 return Err("ERROR extra_arguments".into());
249 }
250 Ok(Command::PersistGet { room_id, container, key })
251 }
252 _ => Err("ERROR unknown_command".into()),
253 }
254}
255
256pub fn format_command(cmd: &Command) -> String {
258 match cmd {
259 Command::RoomCreate => "ROOM.CREATE".into(),
260 Command::RoomDelete(id) => format!("ROOM.DELETE {}", id),
261 Command::RoomList => "ROOM.LIST".into(),
262 Command::RoomInfo(id) => format!("ROOM.INFO {}", id),
263 Command::RoomLabel(id, label) => format!("ROOM.LABEL {} {}", id, format_token(label)),
264 Command::RoomFind(label) => format!("ROOM.FIND {}", format_token(label)),
265 Command::Set { room_id, container, key, value } => {
266 let v = serde_json::to_string(value).unwrap_or_else(|_| "null".into());
267 format!("SET {} {} {} {}", room_id, format_token(container), format_token(key), v)
268 }
269 Command::Del { room_id, container, key } => {
270 format!("DEL {} {} {}", room_id, format_token(container), format_token(key))
271 }
272 Command::Get { room_id, container, key } => {
273 format!("GET {} {} {}", room_id, format_token(container), format_token(key))
274 }
275 Command::Version(id) => format!("VERSION {}", id),
276 Command::SetJwtKey(k) => format!("SET.JWTKEY {}", format_token(k)),
277 Command::ServerSet { room_id, key, value } => {
278 let v = serde_json::to_string(value).unwrap_or_else(|_| "null".into());
279 format!("SERVER.SET {} {} {}", room_id, format_token(key), v)
280 }
281 Command::TxBegin(id) => format!("TX.BEGIN {}", id),
282 Command::TxEnd(id) => format!("TX.END {}", id),
283 Command::TxAbort(id) => format!("TX.ABORT {}", id),
284 Command::TokenGen { room_id, containers } => {
285 let mut parts = vec![room_id.to_string()];
286 parts.extend(containers.iter().map(|c| format_token(c)));
287 format!("TOKEN.GEN {}", parts.join(" "))
288 }
289 Command::Save { room_id } => {
290 format!("SAVE {}", room_id)
291 }
292 Command::Load { room_id } => {
293 format!("LOAD {}", room_id)
294 }
295 Command::PersistSet { room_id, container, key } => {
296 format!("PERSIST.SET {} {} {}", room_id, format_token(container), format_token(key))
297 }
298 Command::PersistUnset { room_id, container, key } => {
299 format!("PERSIST.UNSET {} {} {}", room_id, format_token(container), format_token(key))
300 }
301 Command::PersistGet { room_id, container, key } => {
302 format!("PERSIST.GET {} {} {}", room_id, format_token(container), format_token(key))
303 }
304 Command::ServerDel { room_id, key } => {
305 format!("SERVER.DEL {} {}", room_id, format_token(key))
306 }
307 Command::ServerGet { room_id, key } => {
308 format!("SERVER.GET {} {}", room_id, format_token(key))
309 }
310 }
311}
312
313pub fn parse_response(line: &str) -> Result<Response, String> {
315 let s = line.trim();
316 if s.starts_with("OK") {
317 let rest = s[2..].trim_start();
318 if rest.is_empty() {
319 Ok(Response::Ok(None))
320 } else {
321 Ok(Response::Ok(Some(rest.to_string())))
322 }
323 } else if s.starts_with("ERROR") {
324 let rest = s[5..].trim_start();
325 Ok(Response::Error(rest.to_string()))
326 } else {
327 Err("ERROR invalid_response".into())
328 }
329}
330
331pub fn format_response(resp: &Response) -> String {
333 match resp {
334 Response::Ok(None) => "OK".into(),
335 Response::Ok(Some(p)) => format!("OK {}", p),
336 Response::Error(msg) => format!("ERROR {}", msg),
337 }
338}
339
340fn take_token(input: &str) -> Result<(String, &str), String> {
343 let input = input.trim_start();
344 if input.is_empty() {
345 return Err("ERROR missing_argument".into());
346 }
347
348 if input.starts_with('"') {
349 let mut buf = String::new();
350 let mut escaped = false;
351 for (i, c) in input[1..].char_indices() {
352 if escaped {
353 match c {
354 '\\' => buf.push('\\'),
355 '"' => buf.push('"'),
356 'n' => buf.push('\n'),
357 'r' => buf.push('\r'),
358 't' => buf.push('\t'),
359 other => buf.push(other),
360 }
361 escaped = false;
362 continue;
363 }
364
365 if c == '\\' {
366 escaped = true;
367 continue;
368 }
369
370 if c == '"' {
371 let end = 1 + i + c.len_utf8();
372 let rest = &input[end..];
373 return Ok((buf, rest));
374 }
375
376 buf.push(c);
377 }
378
379 Err("ERROR invalid_argument".into())
380 } else {
381 let mut end = input.len();
382 for (i, c) in input.char_indices() {
383 if c.is_whitespace() {
384 end = i;
385 break;
386 }
387 }
388 let token = input[..end].to_string();
389 let rest = &input[end..];
390 Ok((token, rest))
391 }
392}
393
394fn parse_room_id_from_remainder(remainder: &str) -> Result<(u64, &str), String> {
395 let (room_id, rest) = take_token(remainder)?;
396 let parsed = room_id
397 .parse::<u64>()
398 .map_err(|_| "ERROR invalid_room_id".to_string())?;
399 Ok((parsed, rest))
400}
401
402pub async fn connect_with_auth(addr: &str, api_key: &str) -> Result<TcpStream, std::io::Error> {
405 let mut stream = TcpStream::connect(addr).await?;
406 stream.write_all(api_key.as_bytes()).await?;
407 stream.write_all(b"\n").await?;
408 stream.flush().await?;
409 Ok(stream)
410}
411
412pub async fn connect_with_retry(
414 addr: &str,
415 api_key: &str,
416 retries: usize,
417 delay: Duration,
418) -> Result<TcpStream, std::io::Error> {
419 for attempt in 0..=retries {
420 match connect_with_auth(addr, api_key).await {
421 Ok(s) => return Ok(s),
422 Err(e) => {
423 if attempt == retries {
424 return Err(e);
425 }
426 sleep(delay).await;
427 }
428 }
429 }
430 unreachable!()
431}
432
433pub async fn send_command(stream: &mut TcpStream, cmd: &Command) -> Result<(), std::io::Error> {
435 let s = format_command(cmd);
436 stream.write_all(s.as_bytes()).await?;
437 stream.write_all(b"\n").await?;
438 stream.flush().await?;
439 Ok(())
440}
441
442pub fn start_long_lived_tcp(
448 addr: String,
449 api_key: String,
450) -> (mpsc::Receiver<String>, mpsc::Sender<Command>) {
451 let (tx_in, rx_in) = mpsc::channel(128);
452 let (tx_cmd, rx_cmd) = mpsc::channel(128);
453
454 tokio::spawn(async move {
455 let mut rx_cmd = rx_cmd;
457
458 loop {
459 match connect_with_retry(&addr, &api_key, 3, Duration::from_secs(1)).await {
460 Ok(stream) => {
461 let (read_half, mut write_half) = tokio::io::split(stream);
462 let mut lines = BufReader::new(read_half).lines();
463
464 loop {
465 tokio::select! {
466 res = lines.next_line() => {
467 match res {
468 Ok(Some(line)) => {
469 if tx_in.send(line).await.is_err() {
470 return;
472 }
473 }
474 Ok(None) => {
475 break;
477 }
478 Err(_) => {
479 break;
481 }
482 }
483 }
484 maybe_cmd = rx_cmd.recv() => {
485 match maybe_cmd {
486 Some(cmd) => {
487 let s = format_command(&cmd);
488 if let Err(_) = write_half.write_all(s.as_bytes()).await { break; }
489 if let Err(_) = write_half.write_all(b"\n").await { break; }
490 if let Err(_) = write_half.flush().await { break; }
491 }
492 None => {
493 return;
495 }
496 }
497 }
498 }
499 }
500 }
502 Err(_) => {
503 sleep(Duration::from_secs(1)).await;
505 }
506 }
507 }
508 });
509
510 (rx_in, tx_cmd)
511}
512
513pub struct Client {
516 pub incoming: mpsc::Receiver<String>,
518 pub outgoing: mpsc::Sender<Command>,
520 shutdown_tx: mpsc::Sender<()>,
521 handle: Option<JoinHandle<()>>,
522}
523
524impl Client {
525 pub fn start(addr: String, api_key: String) -> Self {
528 let (tx_in, rx_in) = mpsc::channel(128);
529 let (tx_cmd, rx_cmd) = mpsc::channel(128);
530 let (shutdown_tx, mut shutdown_rx) = mpsc::channel(1);
531
532 let handle = tokio::spawn(async move {
533 let mut rx_cmd = rx_cmd;
534
535 loop {
536 if shutdown_rx.try_recv().is_ok() {
538 return;
539 }
540
541 match connect_with_retry(&addr, &api_key, 3, Duration::from_secs(1)).await {
542 Ok(stream) => {
543 let (read_half, mut write_half) = tokio::io::split(stream);
544 let mut lines = BufReader::new(read_half).lines();
545
546 loop {
547 tokio::select! {
548 _ = shutdown_rx.recv() => {
549 return;
550 }
551 res = lines.next_line() => {
552 match res {
553 Ok(Some(line)) => {
554 if tx_in.send(line).await.is_err() {
555 return;
556 }
557 }
558 Ok(None) => { break; }
559 Err(_) => { break; }
560 }
561 }
562 maybe_cmd = rx_cmd.recv() => {
563 match maybe_cmd {
564 Some(cmd) => {
565 let s = format_command(&cmd);
566 if let Err(_) = write_half.write_all(s.as_bytes()).await { break; }
567 if let Err(_) = write_half.write_all(b"\n").await { break; }
568 if let Err(_) = write_half.flush().await { break; }
569 }
570 None => {
571 return;
573 }
574 }
575 }
576 }
577 }
578 }
580 Err(_) => {
581 tokio::select! {
583 _ = shutdown_rx.recv() => return,
584 _ = sleep(Duration::from_secs(1)) => {}
585 }
586 }
587 }
588 }
589 });
590
591 Client {
592 incoming: rx_in,
593 outgoing: tx_cmd,
594 shutdown_tx,
595 handle: Some(handle),
596 }
597 }
598
599 pub async fn send(&self, cmd: Command) -> Result<(), mpsc::error::SendError<Command>> {
601 self.outgoing.send(cmd).await
602 }
603
604 pub fn try_send(&self, cmd: Command) -> Result<(), mpsc::error::TrySendError<Command>> {
606 self.outgoing.try_send(cmd)
607 }
608
609 pub async fn shutdown(mut self) -> Result<(), tokio::task::JoinError> {
611 let _ = self.shutdown_tx.send(()).await;
612 if let Some(handle) = self.handle.take() {
613 handle.await
614 } else {
615 Ok(())
616 }
617 }
618}
619
620impl Drop for Client {
621 fn drop(&mut self) {
622 let _ = self.shutdown_tx.try_send(());
624 if let Some(handle) = self.handle.take() {
625 handle.abort();
626 }
627 }
628}
629
630fn needs_quote(s: &str) -> bool {
631 s.is_empty() || s.chars().any(|c| c.is_whitespace() || c == '"' || c == '\\')
632}
633
634fn escape_token(s: &str) -> String {
635 let mut out = String::new();
636 for c in s.chars() {
637 match c {
638 '\\' => out.push_str("\\\\"),
639 '"' => out.push_str("\\\"") ,
640 '\n' => out.push_str("\\n"),
641 '\r' => out.push_str("\\r"),
642 '\t' => out.push_str("\\t"),
643 other => out.push(other),
644 }
645 }
646 out
647}
648
649fn format_token(s: &str) -> String {
650 if needs_quote(s) {
651 format!("\"{}\"", escape_token(s))
652 } else {
653 s.to_string()
654 }
655}