1use serde_json::Value;
2use tokio::net::TcpStream;
3use tokio::io::{AsyncWriteExt, AsyncBufReadExt, BufReader};
4use tokio::sync::mpsc;
5use tokio::time::{sleep, Duration};
6use tokio::task::JoinHandle;
7
8#[derive(Debug, Clone, PartialEq)]
10pub enum Command {
11 RoomCreate,
12 RoomDelete(u64),
13 RoomList,
14 RoomInfo(u64),
15 RoomLabel(u64, String),
16 RoomFind(String),
17 Set { room_id: u64, container: String, key: String, value: Value },
18 Del { room_id: u64, container: String, key: String },
19 Get { room_id: u64, container: String, key: String },
20 Version(u64),
21 SetJwtKey(String),
22 TxBegin(u64),
23 TxEnd(u64),
24 TxAbort(u64),
25 TokenGen { room_id: u64, containers: Vec<String> },
26 Save { room_id: u64 },
27 Load { room_id: u64 },
28 PersistSet { room_id: u64, container: String, key: String },
29 PersistUnset { room_id: u64, container: String, key: String },
30 PersistGet { room_id: u64, container: String, key: String },
31}
32
33#[derive(Debug, Clone, PartialEq)]
35pub enum Response {
36 Ok(Option<String>),
37 Error(String),
38}
39
40pub fn parse_command(line: &str) -> Result<Command, String> {
43 let mut remainder = line.trim_start();
44
45 let (cmd, rest) = match take_token(remainder) {
46 Ok((token, rest)) => (token, rest),
47 Err(e) => return Err(e),
48 };
49 remainder = rest;
50
51 match cmd.as_ref() {
52 "ROOM.CREATE" => {
53 if !remainder.trim().is_empty() {
54 return Err("ERROR extra_arguments".into());
55 }
56 Ok(Command::RoomCreate)
57 }
58 "ROOM.DELETE" => {
59 let (room_id, rest) = parse_room_id_from_remainder(remainder)?;
60 if !rest.trim().is_empty() {
61 return Err("ERROR extra_arguments".into());
62 }
63 Ok(Command::RoomDelete(room_id))
64 }
65 "ROOM.LIST" => {
66 if !remainder.trim().is_empty() {
67 return Err("ERROR extra_arguments".into());
68 }
69 Ok(Command::RoomList)
70 }
71 "ROOM.INFO" => {
72 let (room_id, rest) = parse_room_id_from_remainder(remainder)?;
73 if !rest.trim().is_empty() {
74 return Err("ERROR extra_arguments".into());
75 }
76 Ok(Command::RoomInfo(room_id))
77 }
78 "ROOM.LABEL" => {
79 let (room_id, rest) = parse_room_id_from_remainder(remainder)?;
80 let (label, rest) = take_token(rest)?;
81 if label.trim().is_empty() {
82 return Err("ERROR label_invalid".into());
83 }
84 if !rest.trim().is_empty() {
85 return Err("ERROR extra_arguments".into());
86 }
87 Ok(Command::RoomLabel(room_id, label))
88 }
89 "ROOM.FIND" => {
90 let (label, rest) = take_token(remainder)?;
91 if !rest.trim().is_empty() {
92 return Err("ERROR extra_arguments".into());
93 }
94 Ok(Command::RoomFind(label))
95 }
96 "SET" => {
97 let (room_id, rest) = parse_room_id_from_remainder(remainder)?;
98 let (container, rest) = take_token(rest)?;
99 let (key, rest) = take_token(rest)?;
100 let value_json = rest.trim_start();
101 if value_json.is_empty() {
102 return Err("ERROR missing_value".into());
103 }
104 let value: Value = match serde_json::from_str(value_json) {
105 Ok(v) => v,
106 Err(err) => return Err(format!("ERROR invalid_json {}", err)),
107 };
108 Ok(Command::Set { room_id, container, key, value })
109 }
110 "DEL" => {
111 let (room_id, rest) = parse_room_id_from_remainder(remainder)?;
112 let (container, rest) = take_token(rest)?;
113 let (key, rest) = take_token(rest)?;
114 if !rest.trim().is_empty() {
115 return Err("ERROR extra_arguments".into());
116 }
117 Ok(Command::Del { room_id, container, key })
118 }
119 "GET" => {
120 let (room_id, rest) = parse_room_id_from_remainder(remainder)?;
121 let (container, rest) = take_token(rest)?;
122 let (key, rest) = take_token(rest)?;
123 if !rest.trim().is_empty() {
124 return Err("ERROR extra_arguments".into());
125 }
126 Ok(Command::Get { room_id, container, key })
127 }
128 "VERSION" => {
129 let (room_id, rest) = parse_room_id_from_remainder(remainder)?;
130 if !rest.trim().is_empty() {
131 return Err("ERROR extra_arguments".into());
132 }
133 Ok(Command::Version(room_id))
134 }
135 "SET.JWTKEY" => {
136 let (key, rest) = take_token(remainder)?;
137 if !rest.trim().is_empty() {
138 return Err("ERROR extra_arguments".into());
139 }
140 Ok(Command::SetJwtKey(key))
141 }
142 "TX.BEGIN" => {
143 let (room_id, rest) = parse_room_id_from_remainder(remainder)?;
144 if !rest.trim().is_empty() {
145 return Err("ERROR extra_arguments".into());
146 }
147 Ok(Command::TxBegin(room_id))
148 }
149 "TX.END" => {
150 let (room_id, rest) = parse_room_id_from_remainder(remainder)?;
151 if !rest.trim().is_empty() {
152 return Err("ERROR extra_arguments".into());
153 }
154 Ok(Command::TxEnd(room_id))
155 }
156 "TX.ABORT" => {
157 let (room_id, rest) = parse_room_id_from_remainder(remainder)?;
158 if !rest.trim().is_empty() {
159 return Err("ERROR extra_arguments".into());
160 }
161 Ok(Command::TxAbort(room_id))
162 }
163 "TOKEN.GEN" => {
164 let (room_id_str, rest) = take_token(remainder)?;
165 let room_id = match room_id_str.parse::<u64>() {
166 Ok(id) => id,
167 Err(_) => return Err("ERROR invalid_room_id".into()),
168 };
169 let mut containers = Vec::new();
170 let mut leftover = rest;
171 while !leftover.trim().is_empty() {
172 let (tok, rem) = take_token(leftover)?;
173 containers.push(tok);
174 leftover = rem;
175 }
176 Ok(Command::TokenGen { room_id, containers })
177 }
178 "SAVE" => {
179 let (room_id, rest) = parse_room_id_from_remainder(remainder)?;
180 if !rest.trim().is_empty() {
181 return Err("ERROR extra_arguments".into());
182 }
183 Ok(Command::Save { room_id })
184 }
185 "LOAD" => {
186 let (room_id, rest) = parse_room_id_from_remainder(remainder)?;
187 if !rest.trim().is_empty() {
188 return Err("ERROR extra_arguments".into());
189 }
190 Ok(Command::Load { room_id })
191 }
192 "PERSIST.SET" => {
193 let (room_id, rest) = parse_room_id_from_remainder(remainder)?;
194 let (container, rest) = take_token(rest)?;
195 let (key, rest) = take_token(rest)?;
196 if !rest.trim().is_empty() {
197 return Err("ERROR extra_arguments".into());
198 }
199 Ok(Command::PersistSet { room_id, container, key })
200 }
201 "PERSIST.UNSET" => {
202 let (room_id, rest) = parse_room_id_from_remainder(remainder)?;
203 let (container, rest) = take_token(rest)?;
204 let (key, rest) = take_token(rest)?;
205 if !rest.trim().is_empty() {
206 return Err("ERROR extra_arguments".into());
207 }
208 Ok(Command::PersistUnset { room_id, container, key })
209 }
210 "PERSIST.GET" => {
211 let (room_id, rest) = parse_room_id_from_remainder(remainder)?;
212 let (container, rest) = take_token(rest)?;
213 let (key, rest) = take_token(rest)?;
214 if !rest.trim().is_empty() {
215 return Err("ERROR extra_arguments".into());
216 }
217 Ok(Command::PersistGet { room_id, container, key })
218 }
219 _ => Err("ERROR unknown_command".into()),
220 }
221}
222
223pub fn format_command(cmd: &Command) -> String {
225 match cmd {
226 Command::RoomCreate => "ROOM.CREATE".into(),
227 Command::RoomDelete(id) => format!("ROOM.DELETE {}", id),
228 Command::RoomList => "ROOM.LIST".into(),
229 Command::RoomInfo(id) => format!("ROOM.INFO {}", id),
230 Command::RoomLabel(id, label) => format!("ROOM.LABEL {} {}", id, format_token(label)),
231 Command::RoomFind(label) => format!("ROOM.FIND {}", format_token(label)),
232 Command::Set { room_id, container, key, value } => {
233 let v = serde_json::to_string(value).unwrap_or_else(|_| "null".into());
234 format!("SET {} {} {} {}", room_id, format_token(container), format_token(key), v)
235 }
236 Command::Del { room_id, container, key } => {
237 format!("DEL {} {} {}", room_id, format_token(container), format_token(key))
238 }
239 Command::Get { room_id, container, key } => {
240 format!("GET {} {} {}", room_id, format_token(container), format_token(key))
241 }
242 Command::Version(id) => format!("VERSION {}", id),
243 Command::SetJwtKey(k) => format!("SET.JWTKEY {}", format_token(k)),
244 Command::TxBegin(id) => format!("TX.BEGIN {}", id),
245 Command::TxEnd(id) => format!("TX.END {}", id),
246 Command::TxAbort(id) => format!("TX.ABORT {}", id),
247 Command::TokenGen { room_id, containers } => {
248 let mut parts = vec![room_id.to_string()];
249 parts.extend(containers.iter().map(|c| format_token(c)));
250 format!("TOKEN.GEN {}", parts.join(" "))
251 }
252 Command::Save { room_id } => {
253 format!("SAVE {}", room_id)
254 }
255 Command::Load { room_id } => {
256 format!("LOAD {}", room_id)
257 }
258 Command::PersistSet { room_id, container, key } => {
259 format!("PERSIST.SET {} {} {}", room_id, format_token(container), format_token(key))
260 }
261 Command::PersistUnset { room_id, container, key } => {
262 format!("PERSIST.UNSET {} {} {}", room_id, format_token(container), format_token(key))
263 }
264 Command::PersistGet { room_id, container, key } => {
265 format!("PERSIST.GET {} {} {}", room_id, format_token(container), format_token(key))
266 }
267 }
268}
269
270pub fn parse_response(line: &str) -> Result<Response, String> {
272 let s = line.trim();
273 if s.starts_with("OK") {
274 let rest = s[2..].trim_start();
275 if rest.is_empty() {
276 Ok(Response::Ok(None))
277 } else {
278 Ok(Response::Ok(Some(rest.to_string())))
279 }
280 } else if s.starts_with("ERROR") {
281 let rest = s[5..].trim_start();
282 Ok(Response::Error(rest.to_string()))
283 } else {
284 Err("ERROR invalid_response".into())
285 }
286}
287
288pub fn format_response(resp: &Response) -> String {
290 match resp {
291 Response::Ok(None) => "OK".into(),
292 Response::Ok(Some(p)) => format!("OK {}", p),
293 Response::Error(msg) => format!("ERROR {}", msg),
294 }
295}
296
297fn take_token(input: &str) -> Result<(String, &str), String> {
300 let input = input.trim_start();
301 if input.is_empty() {
302 return Err("ERROR missing_argument".into());
303 }
304
305 if input.starts_with('"') {
306 let mut buf = String::new();
307 let mut escaped = false;
308 for (i, c) in input[1..].char_indices() {
309 if escaped {
310 match c {
311 '\\' => buf.push('\\'),
312 '"' => buf.push('"'),
313 'n' => buf.push('\n'),
314 'r' => buf.push('\r'),
315 't' => buf.push('\t'),
316 other => buf.push(other),
317 }
318 escaped = false;
319 continue;
320 }
321
322 if c == '\\' {
323 escaped = true;
324 continue;
325 }
326
327 if c == '"' {
328 let end = 1 + i + c.len_utf8();
329 let rest = &input[end..];
330 return Ok((buf, rest));
331 }
332
333 buf.push(c);
334 }
335
336 Err("ERROR invalid_argument".into())
337 } else {
338 let mut end = input.len();
339 for (i, c) in input.char_indices() {
340 if c.is_whitespace() {
341 end = i;
342 break;
343 }
344 }
345 let token = input[..end].to_string();
346 let rest = &input[end..];
347 Ok((token, rest))
348 }
349}
350
351fn parse_room_id_from_remainder(remainder: &str) -> Result<(u64, &str), String> {
352 let (room_id, rest) = take_token(remainder)?;
353 let parsed = room_id
354 .parse::<u64>()
355 .map_err(|_| "ERROR invalid_room_id".to_string())?;
356 Ok((parsed, rest))
357}
358
359pub async fn connect_with_auth(addr: &str, api_key: &str) -> Result<TcpStream, std::io::Error> {
362 let mut stream = TcpStream::connect(addr).await?;
363 stream.write_all(api_key.as_bytes()).await?;
364 stream.write_all(b"\n").await?;
365 stream.flush().await?;
366 Ok(stream)
367}
368
369pub async fn connect_with_retry(
371 addr: &str,
372 api_key: &str,
373 retries: usize,
374 delay: Duration,
375) -> Result<TcpStream, std::io::Error> {
376 for attempt in 0..=retries {
377 match connect_with_auth(addr, api_key).await {
378 Ok(s) => return Ok(s),
379 Err(e) => {
380 if attempt == retries {
381 return Err(e);
382 }
383 sleep(delay).await;
384 }
385 }
386 }
387 unreachable!()
388}
389
390pub async fn send_command(stream: &mut TcpStream, cmd: &Command) -> Result<(), std::io::Error> {
392 let s = format_command(cmd);
393 stream.write_all(s.as_bytes()).await?;
394 stream.write_all(b"\n").await?;
395 stream.flush().await?;
396 Ok(())
397}
398
399pub fn start_long_lived_tcp(
405 addr: String,
406 api_key: String,
407) -> (mpsc::Receiver<String>, mpsc::Sender<Command>) {
408 let (tx_in, rx_in) = mpsc::channel(128);
409 let (tx_cmd, rx_cmd) = mpsc::channel(128);
410
411 tokio::spawn(async move {
412 let mut rx_cmd = rx_cmd;
414
415 loop {
416 match connect_with_retry(&addr, &api_key, 3, Duration::from_secs(1)).await {
417 Ok(stream) => {
418 let (read_half, mut write_half) = tokio::io::split(stream);
419 let mut lines = BufReader::new(read_half).lines();
420
421 loop {
422 tokio::select! {
423 res = lines.next_line() => {
424 match res {
425 Ok(Some(line)) => {
426 if tx_in.send(line).await.is_err() {
427 return;
429 }
430 }
431 Ok(None) => {
432 break;
434 }
435 Err(_) => {
436 break;
438 }
439 }
440 }
441 maybe_cmd = rx_cmd.recv() => {
442 match maybe_cmd {
443 Some(cmd) => {
444 let s = format_command(&cmd);
445 if let Err(_) = write_half.write_all(s.as_bytes()).await { break; }
446 if let Err(_) = write_half.write_all(b"\n").await { break; }
447 if let Err(_) = write_half.flush().await { break; }
448 }
449 None => {
450 return;
452 }
453 }
454 }
455 }
456 }
457 }
459 Err(_) => {
460 sleep(Duration::from_secs(1)).await;
462 }
463 }
464 }
465 });
466
467 (rx_in, tx_cmd)
468}
469
470pub struct Client {
473 pub incoming: mpsc::Receiver<String>,
475 pub outgoing: mpsc::Sender<Command>,
477 shutdown_tx: mpsc::Sender<()>,
478 handle: Option<JoinHandle<()>>,
479}
480
481impl Client {
482 pub fn start(addr: String, api_key: String) -> Self {
485 let (tx_in, rx_in) = mpsc::channel(128);
486 let (tx_cmd, rx_cmd) = mpsc::channel(128);
487 let (shutdown_tx, mut shutdown_rx) = mpsc::channel(1);
488
489 let handle = tokio::spawn(async move {
490 let mut rx_cmd = rx_cmd;
491
492 loop {
493 if shutdown_rx.try_recv().is_ok() {
495 return;
496 }
497
498 match connect_with_retry(&addr, &api_key, 3, Duration::from_secs(1)).await {
499 Ok(stream) => {
500 let (read_half, mut write_half) = tokio::io::split(stream);
501 let mut lines = BufReader::new(read_half).lines();
502
503 loop {
504 tokio::select! {
505 _ = shutdown_rx.recv() => {
506 return;
507 }
508 res = lines.next_line() => {
509 match res {
510 Ok(Some(line)) => {
511 if tx_in.send(line).await.is_err() {
512 return;
513 }
514 }
515 Ok(None) => { break; }
516 Err(_) => { break; }
517 }
518 }
519 maybe_cmd = rx_cmd.recv() => {
520 match maybe_cmd {
521 Some(cmd) => {
522 let s = format_command(&cmd);
523 if let Err(_) = write_half.write_all(s.as_bytes()).await { break; }
524 if let Err(_) = write_half.write_all(b"\n").await { break; }
525 if let Err(_) = write_half.flush().await { break; }
526 }
527 None => {
528 return;
530 }
531 }
532 }
533 }
534 }
535 }
537 Err(_) => {
538 tokio::select! {
540 _ = shutdown_rx.recv() => return,
541 _ = sleep(Duration::from_secs(1)) => {}
542 }
543 }
544 }
545 }
546 });
547
548 Client {
549 incoming: rx_in,
550 outgoing: tx_cmd,
551 shutdown_tx,
552 handle: Some(handle),
553 }
554 }
555
556 pub async fn send(&self, cmd: Command) -> Result<(), mpsc::error::SendError<Command>> {
558 self.outgoing.send(cmd).await
559 }
560
561 pub fn try_send(&self, cmd: Command) -> Result<(), mpsc::error::TrySendError<Command>> {
563 self.outgoing.try_send(cmd)
564 }
565
566 pub async fn shutdown(mut self) -> Result<(), tokio::task::JoinError> {
568 let _ = self.shutdown_tx.send(()).await;
569 if let Some(handle) = self.handle.take() {
570 handle.await
571 } else {
572 Ok(())
573 }
574 }
575}
576
577impl Drop for Client {
578 fn drop(&mut self) {
579 let _ = self.shutdown_tx.try_send(());
581 if let Some(handle) = self.handle.take() {
582 handle.abort();
583 }
584 }
585}
586
587fn needs_quote(s: &str) -> bool {
588 s.is_empty() || s.chars().any(|c| c.is_whitespace() || c == '"' || c == '\\')
589}
590
591fn escape_token(s: &str) -> String {
592 let mut out = String::new();
593 for c in s.chars() {
594 match c {
595 '\\' => out.push_str("\\\\"),
596 '"' => out.push_str("\\\"") ,
597 '\n' => out.push_str("\\n"),
598 '\r' => out.push_str("\\r"),
599 '\t' => out.push_str("\\t"),
600 other => out.push(other),
601 }
602 }
603 out
604}
605
606fn format_token(s: &str) -> String {
607 if needs_quote(s) {
608 format!("\"{}\"", escape_token(s))
609 } else {
610 s.to_string()
611 }
612}