1use serde_json::Value;
2use tokio::net::TcpStream;
3use tokio::io::{AsyncWriteExt, AsyncBufReadExt, BufReader};
4use tokio::sync::mpsc;
5use tokio::time::{sleep, Duration};
6use tokio::task::JoinHandle;
7
8#[derive(Debug, Clone, PartialEq)]
10pub enum Command {
11 RoomCreate,
12 RoomDelete(u64),
13 RoomList,
14 RoomInfo(u64),
15 RoomLabel(u64, String),
16 RoomFind(String),
17 Set { room_id: u64, container: String, key: String, value: Value },
18 Del { room_id: u64, container: String, key: String },
19 Get { room_id: u64, container: String, key: String },
20 Version(u64),
21 SetJwtKey(String),
22 TxBegin(u64),
23 TxEnd(u64),
24 TxAbort(u64),
25 TokenGen { room_id: u64, containers: Vec<String> },
26}
27
28#[derive(Debug, Clone, PartialEq)]
30pub enum Response {
31 Ok(Option<String>),
32 Error(String),
33}
34
35pub fn parse_command(line: &str) -> Result<Command, String> {
38 let mut remainder = line.trim_start();
39
40 let (cmd, rest) = match take_token(remainder) {
41 Ok((token, rest)) => (token, rest),
42 Err(e) => return Err(e),
43 };
44 remainder = rest;
45
46 match cmd.as_ref() {
47 "ROOM.CREATE" => {
48 if !remainder.trim().is_empty() {
49 return Err("ERROR extra_arguments".into());
50 }
51 Ok(Command::RoomCreate)
52 }
53 "ROOM.DELETE" => {
54 let (room_id, rest) = parse_room_id_from_remainder(remainder)?;
55 if !rest.trim().is_empty() {
56 return Err("ERROR extra_arguments".into());
57 }
58 Ok(Command::RoomDelete(room_id))
59 }
60 "ROOM.LIST" => {
61 if !remainder.trim().is_empty() {
62 return Err("ERROR extra_arguments".into());
63 }
64 Ok(Command::RoomList)
65 }
66 "ROOM.INFO" => {
67 let (room_id, rest) = parse_room_id_from_remainder(remainder)?;
68 if !rest.trim().is_empty() {
69 return Err("ERROR extra_arguments".into());
70 }
71 Ok(Command::RoomInfo(room_id))
72 }
73 "ROOM.LABEL" => {
74 let (room_id, rest) = parse_room_id_from_remainder(remainder)?;
75 let (label, rest) = take_token(rest)?;
76 if label.trim().is_empty() {
77 return Err("ERROR label_invalid".into());
78 }
79 if !rest.trim().is_empty() {
80 return Err("ERROR extra_arguments".into());
81 }
82 Ok(Command::RoomLabel(room_id, label))
83 }
84 "ROOM.FIND" => {
85 let (label, rest) = take_token(remainder)?;
86 if !rest.trim().is_empty() {
87 return Err("ERROR extra_arguments".into());
88 }
89 Ok(Command::RoomFind(label))
90 }
91 "SET" => {
92 let (room_id, rest) = parse_room_id_from_remainder(remainder)?;
93 let (container, rest) = take_token(rest)?;
94 let (key, rest) = take_token(rest)?;
95 let value_json = rest.trim_start();
96 if value_json.is_empty() {
97 return Err("ERROR missing_value".into());
98 }
99 let value: Value = match serde_json::from_str(value_json) {
100 Ok(v) => v,
101 Err(err) => return Err(format!("ERROR invalid_json {}", err)),
102 };
103 Ok(Command::Set { room_id, container, key, value })
104 }
105 "DEL" => {
106 let (room_id, rest) = parse_room_id_from_remainder(remainder)?;
107 let (container, rest) = take_token(rest)?;
108 let (key, rest) = take_token(rest)?;
109 if !rest.trim().is_empty() {
110 return Err("ERROR extra_arguments".into());
111 }
112 Ok(Command::Del { room_id, container, key })
113 }
114 "GET" => {
115 let (room_id, rest) = parse_room_id_from_remainder(remainder)?;
116 let (container, rest) = take_token(rest)?;
117 let (key, rest) = take_token(rest)?;
118 if !rest.trim().is_empty() {
119 return Err("ERROR extra_arguments".into());
120 }
121 Ok(Command::Get { room_id, container, key })
122 }
123 "VERSION" => {
124 let (room_id, rest) = parse_room_id_from_remainder(remainder)?;
125 if !rest.trim().is_empty() {
126 return Err("ERROR extra_arguments".into());
127 }
128 Ok(Command::Version(room_id))
129 }
130 "SET.JWTKEY" => {
131 let (key, rest) = take_token(remainder)?;
132 if !rest.trim().is_empty() {
133 return Err("ERROR extra_arguments".into());
134 }
135 Ok(Command::SetJwtKey(key))
136 }
137 "TX.BEGIN" => {
138 let (room_id, rest) = parse_room_id_from_remainder(remainder)?;
139 if !rest.trim().is_empty() {
140 return Err("ERROR extra_arguments".into());
141 }
142 Ok(Command::TxBegin(room_id))
143 }
144 "TX.END" => {
145 let (room_id, rest) = parse_room_id_from_remainder(remainder)?;
146 if !rest.trim().is_empty() {
147 return Err("ERROR extra_arguments".into());
148 }
149 Ok(Command::TxEnd(room_id))
150 }
151 "TX.ABORT" => {
152 let (room_id, rest) = parse_room_id_from_remainder(remainder)?;
153 if !rest.trim().is_empty() {
154 return Err("ERROR extra_arguments".into());
155 }
156 Ok(Command::TxAbort(room_id))
157 }
158 "TOKEN.GEN" => {
159 let (room_id_str, rest) = take_token(remainder)?;
160 let room_id = match room_id_str.parse::<u64>() {
161 Ok(id) => id,
162 Err(_) => return Err("ERROR invalid_room_id".into()),
163 };
164 let mut containers = Vec::new();
165 let mut leftover = rest;
166 while !leftover.trim().is_empty() {
167 let (tok, rem) = take_token(leftover)?;
168 containers.push(tok);
169 leftover = rem;
170 }
171 Ok(Command::TokenGen { room_id, containers })
172 }
173 _ => Err("ERROR unknown_command".into()),
174 }
175}
176
177pub fn format_command(cmd: &Command) -> String {
179 match cmd {
180 Command::RoomCreate => "ROOM.CREATE".into(),
181 Command::RoomDelete(id) => format!("ROOM.DELETE {}", id),
182 Command::RoomList => "ROOM.LIST".into(),
183 Command::RoomInfo(id) => format!("ROOM.INFO {}", id),
184 Command::RoomLabel(id, label) => format!("ROOM.LABEL {} {}", id, format_token(label)),
185 Command::RoomFind(label) => format!("ROOM.FIND {}", format_token(label)),
186 Command::Set { room_id, container, key, value } => {
187 let v = serde_json::to_string(value).unwrap_or_else(|_| "null".into());
188 format!("SET {} {} {} {}", room_id, format_token(container), format_token(key), v)
189 }
190 Command::Del { room_id, container, key } => {
191 format!("DEL {} {} {}", room_id, format_token(container), format_token(key))
192 }
193 Command::Get { room_id, container, key } => {
194 format!("GET {} {} {}", room_id, format_token(container), format_token(key))
195 }
196 Command::Version(id) => format!("VERSION {}", id),
197 Command::SetJwtKey(k) => format!("SET.JWTKEY {}", format_token(k)),
198 Command::TxBegin(id) => format!("TX.BEGIN {}", id),
199 Command::TxEnd(id) => format!("TX.END {}", id),
200 Command::TxAbort(id) => format!("TX.ABORT {}", id),
201 Command::TokenGen { room_id, containers } => {
202 let mut parts = vec![room_id.to_string()];
203 parts.extend(containers.iter().map(|c| format_token(c)));
204 format!("TOKEN.GEN {}", parts.join(" "))
205 }
206 }
207}
208
209pub fn parse_response(line: &str) -> Result<Response, String> {
211 let s = line.trim();
212 if s.starts_with("OK") {
213 let rest = s[2..].trim_start();
214 if rest.is_empty() {
215 Ok(Response::Ok(None))
216 } else {
217 Ok(Response::Ok(Some(rest.to_string())))
218 }
219 } else if s.starts_with("ERROR") {
220 let rest = s[5..].trim_start();
221 Ok(Response::Error(rest.to_string()))
222 } else {
223 Err("ERROR invalid_response".into())
224 }
225}
226
227pub fn format_response(resp: &Response) -> String {
229 match resp {
230 Response::Ok(None) => "OK".into(),
231 Response::Ok(Some(p)) => format!("OK {}", p),
232 Response::Error(msg) => format!("ERROR {}", msg),
233 }
234}
235
236fn take_token(input: &str) -> Result<(String, &str), String> {
239 let input = input.trim_start();
240 if input.is_empty() {
241 return Err("ERROR missing_argument".into());
242 }
243
244 if input.starts_with('"') {
245 let mut buf = String::new();
246 let mut escaped = false;
247 for (i, c) in input[1..].char_indices() {
248 if escaped {
249 match c {
250 '\\' => buf.push('\\'),
251 '"' => buf.push('"'),
252 'n' => buf.push('\n'),
253 'r' => buf.push('\r'),
254 't' => buf.push('\t'),
255 other => buf.push(other),
256 }
257 escaped = false;
258 continue;
259 }
260
261 if c == '\\' {
262 escaped = true;
263 continue;
264 }
265
266 if c == '"' {
267 let end = 1 + i + c.len_utf8();
268 let rest = &input[end..];
269 return Ok((buf, rest));
270 }
271
272 buf.push(c);
273 }
274
275 Err("ERROR invalid_argument".into())
276 } else {
277 let mut end = input.len();
278 for (i, c) in input.char_indices() {
279 if c.is_whitespace() {
280 end = i;
281 break;
282 }
283 }
284 let token = input[..end].to_string();
285 let rest = &input[end..];
286 Ok((token, rest))
287 }
288}
289
290fn parse_room_id_from_remainder(remainder: &str) -> Result<(u64, &str), String> {
291 let (room_id, rest) = take_token(remainder)?;
292 let parsed = room_id
293 .parse::<u64>()
294 .map_err(|_| "ERROR invalid_room_id".to_string())?;
295 Ok((parsed, rest))
296}
297
298pub async fn connect_with_auth(addr: &str, api_key: &str) -> Result<TcpStream, std::io::Error> {
301 let mut stream = TcpStream::connect(addr).await?;
302 stream.write_all(api_key.as_bytes()).await?;
303 stream.write_all(b"\n").await?;
304 stream.flush().await?;
305 Ok(stream)
306}
307
308pub async fn connect_with_retry(
310 addr: &str,
311 api_key: &str,
312 retries: usize,
313 delay: Duration,
314) -> Result<TcpStream, std::io::Error> {
315 for attempt in 0..=retries {
316 match connect_with_auth(addr, api_key).await {
317 Ok(s) => return Ok(s),
318 Err(e) => {
319 if attempt == retries {
320 return Err(e);
321 }
322 sleep(delay).await;
323 }
324 }
325 }
326 unreachable!()
327}
328
329pub async fn send_command(stream: &mut TcpStream, cmd: &Command) -> Result<(), std::io::Error> {
331 let s = format_command(cmd);
332 stream.write_all(s.as_bytes()).await?;
333 stream.write_all(b"\n").await?;
334 stream.flush().await?;
335 Ok(())
336}
337
338pub fn start_long_lived_tcp(
344 addr: String,
345 api_key: String,
346) -> (mpsc::Receiver<String>, mpsc::Sender<Command>) {
347 let (tx_in, rx_in) = mpsc::channel(128);
348 let (tx_cmd, rx_cmd) = mpsc::channel(128);
349
350 tokio::spawn(async move {
351 let mut rx_cmd = rx_cmd;
353
354 loop {
355 match connect_with_retry(&addr, &api_key, 3, Duration::from_secs(1)).await {
356 Ok(stream) => {
357 let (read_half, mut write_half) = tokio::io::split(stream);
358 let mut lines = BufReader::new(read_half).lines();
359
360 loop {
361 tokio::select! {
362 res = lines.next_line() => {
363 match res {
364 Ok(Some(line)) => {
365 if tx_in.send(line).await.is_err() {
366 return;
368 }
369 }
370 Ok(None) => {
371 break;
373 }
374 Err(_) => {
375 break;
377 }
378 }
379 }
380 maybe_cmd = rx_cmd.recv() => {
381 match maybe_cmd {
382 Some(cmd) => {
383 let s = format_command(&cmd);
384 if let Err(_) = write_half.write_all(s.as_bytes()).await { break; }
385 if let Err(_) = write_half.write_all(b"\n").await { break; }
386 if let Err(_) = write_half.flush().await { break; }
387 }
388 None => {
389 return;
391 }
392 }
393 }
394 }
395 }
396 }
398 Err(_) => {
399 sleep(Duration::from_secs(1)).await;
401 }
402 }
403 }
404 });
405
406 (rx_in, tx_cmd)
407}
408
409pub struct Client {
412 pub incoming: mpsc::Receiver<String>,
414 pub outgoing: mpsc::Sender<Command>,
416 shutdown_tx: mpsc::Sender<()>,
417 handle: Option<JoinHandle<()>>,
418}
419
420impl Client {
421 pub fn start(addr: String, api_key: String) -> Self {
424 let (tx_in, rx_in) = mpsc::channel(128);
425 let (tx_cmd, rx_cmd) = mpsc::channel(128);
426 let (shutdown_tx, mut shutdown_rx) = mpsc::channel(1);
427
428 let handle = tokio::spawn(async move {
429 let mut rx_cmd = rx_cmd;
430
431 loop {
432 if shutdown_rx.try_recv().is_ok() {
434 return;
435 }
436
437 match connect_with_retry(&addr, &api_key, 3, Duration::from_secs(1)).await {
438 Ok(stream) => {
439 let (read_half, mut write_half) = tokio::io::split(stream);
440 let mut lines = BufReader::new(read_half).lines();
441
442 loop {
443 tokio::select! {
444 _ = shutdown_rx.recv() => {
445 return;
446 }
447 res = lines.next_line() => {
448 match res {
449 Ok(Some(line)) => {
450 if tx_in.send(line).await.is_err() {
451 return;
452 }
453 }
454 Ok(None) => { break; }
455 Err(_) => { break; }
456 }
457 }
458 maybe_cmd = rx_cmd.recv() => {
459 match maybe_cmd {
460 Some(cmd) => {
461 let s = format_command(&cmd);
462 if let Err(_) = write_half.write_all(s.as_bytes()).await { break; }
463 if let Err(_) = write_half.write_all(b"\n").await { break; }
464 if let Err(_) = write_half.flush().await { break; }
465 }
466 None => {
467 return;
469 }
470 }
471 }
472 }
473 }
474 }
476 Err(_) => {
477 tokio::select! {
479 _ = shutdown_rx.recv() => return,
480 _ = sleep(Duration::from_secs(1)) => {}
481 }
482 }
483 }
484 }
485 });
486
487 Client {
488 incoming: rx_in,
489 outgoing: tx_cmd,
490 shutdown_tx,
491 handle: Some(handle),
492 }
493 }
494
495 pub async fn send(&self, cmd: Command) -> Result<(), mpsc::error::SendError<Command>> {
497 self.outgoing.send(cmd).await
498 }
499
500 pub fn try_send(&self, cmd: Command) -> Result<(), mpsc::error::TrySendError<Command>> {
502 self.outgoing.try_send(cmd)
503 }
504
505 pub async fn shutdown(mut self) -> Result<(), tokio::task::JoinError> {
507 let _ = self.shutdown_tx.send(()).await;
508 if let Some(handle) = self.handle.take() {
509 handle.await
510 } else {
511 Ok(())
512 }
513 }
514}
515
516impl Drop for Client {
517 fn drop(&mut self) {
518 let _ = self.shutdown_tx.try_send(());
520 if let Some(handle) = self.handle.take() {
521 handle.abort();
522 }
523 }
524}
525
526fn needs_quote(s: &str) -> bool {
527 s.is_empty() || s.chars().any(|c| c.is_whitespace() || c == '"' || c == '\\')
528}
529
530fn escape_token(s: &str) -> String {
531 let mut out = String::new();
532 for c in s.chars() {
533 match c {
534 '\\' => out.push_str("\\\\"),
535 '"' => out.push_str("\\\"") ,
536 '\n' => out.push_str("\\n"),
537 '\r' => out.push_str("\\r"),
538 '\t' => out.push_str("\\t"),
539 other => out.push(other),
540 }
541 }
542 out
543}
544
545fn format_token(s: &str) -> String {
546 if needs_quote(s) {
547 format!("\"{}\"", escape_token(s))
548 } else {
549 s.to_string()
550 }
551}