Skip to main content

syncpond_protocol/
protocol.rs

1use serde_json::Value;
2use tokio::net::TcpStream;
3use tokio::io::{AsyncWriteExt, AsyncBufReadExt, BufReader};
4use tokio::sync::mpsc;
5use tokio::time::{sleep, Duration};
6use tokio::task::JoinHandle;
7
8/// Command enum representing the app-level commands.
9#[derive(Debug, Clone, PartialEq)]
10pub enum Command {
11    RoomCreate,
12    RoomDelete(u64),
13    RoomList,
14    RoomInfo(u64),
15    RoomLabel(u64, String),
16    RoomFind(String),
17    Set { room_id: u64, container: String, key: String, value: Value },
18    Del { room_id: u64, container: String, key: String },
19    Get { room_id: u64, container: String, key: String },
20    Version(u64),
21    SetJwtKey(String),
22    TxBegin(u64),
23    TxEnd(u64),
24    TxAbort(u64),
25    TokenGen { room_id: u64, containers: Vec<String> },
26}
27
28/// Simple response representation. `Ok` may contain an optional payload string.
29#[derive(Debug, Clone, PartialEq)]
30pub enum Response {
31    Ok(Option<String>),
32    Error(String),
33}
34
35/// Parse an incoming command line into `Command`.
36/// Errors are returned as server-style strings like `ERROR missing_argument`.
37pub fn parse_command(line: &str) -> Result<Command, String> {
38    let mut remainder = line.trim_start();
39
40    let (cmd, rest) = match take_token(remainder) {
41        Ok((token, rest)) => (token, rest),
42        Err(e) => return Err(e),
43    };
44    remainder = rest;
45
46    match cmd.as_ref() {
47        "ROOM.CREATE" => {
48            if !remainder.trim().is_empty() {
49                return Err("ERROR extra_arguments".into());
50            }
51            Ok(Command::RoomCreate)
52        }
53        "ROOM.DELETE" => {
54            let (room_id, rest) = parse_room_id_from_remainder(remainder)?;
55            if !rest.trim().is_empty() {
56                return Err("ERROR extra_arguments".into());
57            }
58            Ok(Command::RoomDelete(room_id))
59        }
60        "ROOM.LIST" => {
61            if !remainder.trim().is_empty() {
62                return Err("ERROR extra_arguments".into());
63            }
64            Ok(Command::RoomList)
65        }
66        "ROOM.INFO" => {
67            let (room_id, rest) = parse_room_id_from_remainder(remainder)?;
68            if !rest.trim().is_empty() {
69                return Err("ERROR extra_arguments".into());
70            }
71            Ok(Command::RoomInfo(room_id))
72        }
73        "ROOM.LABEL" => {
74            let (room_id, rest) = parse_room_id_from_remainder(remainder)?;
75            let (label, rest) = take_token(rest)?;
76            if label.trim().is_empty() {
77                return Err("ERROR label_invalid".into());
78            }
79            if !rest.trim().is_empty() {
80                return Err("ERROR extra_arguments".into());
81            }
82            Ok(Command::RoomLabel(room_id, label))
83        }
84        "ROOM.FIND" => {
85            let (label, rest) = take_token(remainder)?;
86            if !rest.trim().is_empty() {
87                return Err("ERROR extra_arguments".into());
88            }
89            Ok(Command::RoomFind(label))
90        }
91        "SET" => {
92            let (room_id, rest) = parse_room_id_from_remainder(remainder)?;
93            let (container, rest) = take_token(rest)?;
94            let (key, rest) = take_token(rest)?;
95            let value_json = rest.trim_start();
96            if value_json.is_empty() {
97                return Err("ERROR missing_value".into());
98            }
99            let value: Value = match serde_json::from_str(value_json) {
100                Ok(v) => v,
101                Err(err) => return Err(format!("ERROR invalid_json {}", err)),
102            };
103            Ok(Command::Set { room_id, container, key, value })
104        }
105        "DEL" => {
106            let (room_id, rest) = parse_room_id_from_remainder(remainder)?;
107            let (container, rest) = take_token(rest)?;
108            let (key, rest) = take_token(rest)?;
109            if !rest.trim().is_empty() {
110                return Err("ERROR extra_arguments".into());
111            }
112            Ok(Command::Del { room_id, container, key })
113        }
114        "GET" => {
115            let (room_id, rest) = parse_room_id_from_remainder(remainder)?;
116            let (container, rest) = take_token(rest)?;
117            let (key, rest) = take_token(rest)?;
118            if !rest.trim().is_empty() {
119                return Err("ERROR extra_arguments".into());
120            }
121            Ok(Command::Get { room_id, container, key })
122        }
123        "VERSION" => {
124            let (room_id, rest) = parse_room_id_from_remainder(remainder)?;
125            if !rest.trim().is_empty() {
126                return Err("ERROR extra_arguments".into());
127            }
128            Ok(Command::Version(room_id))
129        }
130        "SET.JWTKEY" => {
131            let (key, rest) = take_token(remainder)?;
132            if !rest.trim().is_empty() {
133                return Err("ERROR extra_arguments".into());
134            }
135            Ok(Command::SetJwtKey(key))
136        }
137        "TX.BEGIN" => {
138            let (room_id, rest) = parse_room_id_from_remainder(remainder)?;
139            if !rest.trim().is_empty() {
140                return Err("ERROR extra_arguments".into());
141            }
142            Ok(Command::TxBegin(room_id))
143        }
144        "TX.END" => {
145            let (room_id, rest) = parse_room_id_from_remainder(remainder)?;
146            if !rest.trim().is_empty() {
147                return Err("ERROR extra_arguments".into());
148            }
149            Ok(Command::TxEnd(room_id))
150        }
151        "TX.ABORT" => {
152            let (room_id, rest) = parse_room_id_from_remainder(remainder)?;
153            if !rest.trim().is_empty() {
154                return Err("ERROR extra_arguments".into());
155            }
156            Ok(Command::TxAbort(room_id))
157        }
158        "TOKEN.GEN" => {
159            let (room_id_str, rest) = take_token(remainder)?;
160            let room_id = match room_id_str.parse::<u64>() {
161                Ok(id) => id,
162                Err(_) => return Err("ERROR invalid_room_id".into()),
163            };
164            let mut containers = Vec::new();
165            let mut leftover = rest;
166            while !leftover.trim().is_empty() {
167                let (tok, rem) = take_token(leftover)?;
168                containers.push(tok);
169                leftover = rem;
170            }
171            Ok(Command::TokenGen { room_id, containers })
172        }
173        _ => Err("ERROR unknown_command".into()),
174    }
175}
176
177/// Format a `Command` into a single-line string suitable for sending to the server.
178pub fn format_command(cmd: &Command) -> String {
179    match cmd {
180        Command::RoomCreate => "ROOM.CREATE".into(),
181        Command::RoomDelete(id) => format!("ROOM.DELETE {}", id),
182        Command::RoomList => "ROOM.LIST".into(),
183        Command::RoomInfo(id) => format!("ROOM.INFO {}", id),
184        Command::RoomLabel(id, label) => format!("ROOM.LABEL {} {}", id, format_token(label)),
185        Command::RoomFind(label) => format!("ROOM.FIND {}", format_token(label)),
186        Command::Set { room_id, container, key, value } => {
187            let v = serde_json::to_string(value).unwrap_or_else(|_| "null".into());
188            format!("SET {} {} {} {}", room_id, format_token(container), format_token(key), v)
189        }
190        Command::Del { room_id, container, key } => {
191            format!("DEL {} {} {}", room_id, format_token(container), format_token(key))
192        }
193        Command::Get { room_id, container, key } => {
194            format!("GET {} {} {}", room_id, format_token(container), format_token(key))
195        }
196        Command::Version(id) => format!("VERSION {}", id),
197        Command::SetJwtKey(k) => format!("SET.JWTKEY {}", format_token(k)),
198        Command::TxBegin(id) => format!("TX.BEGIN {}", id),
199        Command::TxEnd(id) => format!("TX.END {}", id),
200        Command::TxAbort(id) => format!("TX.ABORT {}", id),
201        Command::TokenGen { room_id, containers } => {
202            let mut parts = vec![room_id.to_string()];
203            parts.extend(containers.iter().map(|c| format_token(c)));
204            format!("TOKEN.GEN {}", parts.join(" "))
205        }
206    }
207}
208
209/// Parse a server response line into `Response`.
210pub fn parse_response(line: &str) -> Result<Response, String> {
211    let s = line.trim();
212    if s.starts_with("OK") {
213        let rest = s[2..].trim_start();
214        if rest.is_empty() {
215            Ok(Response::Ok(None))
216        } else {
217            Ok(Response::Ok(Some(rest.to_string())))
218        }
219    } else if s.starts_with("ERROR") {
220        let rest = s[5..].trim_start();
221        Ok(Response::Error(rest.to_string()))
222    } else {
223        Err("ERROR invalid_response".into())
224    }
225}
226
227/// Format a `Response` back to a line.
228pub fn format_response(resp: &Response) -> String {
229    match resp {
230        Response::Ok(None) => "OK".into(),
231        Response::Ok(Some(p)) => format!("OK {}", p),
232        Response::Error(msg) => format!("ERROR {}", msg),
233    }
234}
235
236// ----------------- helpers -----------------
237
238fn take_token(input: &str) -> Result<(String, &str), String> {
239    let input = input.trim_start();
240    if input.is_empty() {
241        return Err("ERROR missing_argument".into());
242    }
243
244    if input.starts_with('"') {
245        let mut buf = String::new();
246        let mut escaped = false;
247        for (i, c) in input[1..].char_indices() {
248            if escaped {
249                match c {
250                    '\\' => buf.push('\\'),
251                    '"' => buf.push('"'),
252                    'n' => buf.push('\n'),
253                    'r' => buf.push('\r'),
254                    't' => buf.push('\t'),
255                    other => buf.push(other),
256                }
257                escaped = false;
258                continue;
259            }
260
261            if c == '\\' {
262                escaped = true;
263                continue;
264            }
265
266            if c == '"' {
267                let end = 1 + i + c.len_utf8();
268                let rest = &input[end..];
269                return Ok((buf, rest));
270            }
271
272            buf.push(c);
273        }
274
275        Err("ERROR invalid_argument".into())
276    } else {
277        let mut end = input.len();
278        for (i, c) in input.char_indices() {
279            if c.is_whitespace() {
280                end = i;
281                break;
282            }
283        }
284        let token = input[..end].to_string();
285        let rest = &input[end..];
286        Ok((token, rest))
287    }
288}
289
290fn parse_room_id_from_remainder(remainder: &str) -> Result<(u64, &str), String> {
291    let (room_id, rest) = take_token(remainder)?;
292    let parsed = room_id
293        .parse::<u64>()
294        .map_err(|_| "ERROR invalid_room_id".to_string())?;
295    Ok((parsed, rest))
296}
297
298/// Attempt to connect to `addr`, send the `api_key` followed by a newline,
299/// and return the established `TcpStream` if successful.
300pub async fn connect_with_auth(addr: &str, api_key: &str) -> Result<TcpStream, std::io::Error> {
301    let mut stream = TcpStream::connect(addr).await?;
302    stream.write_all(api_key.as_bytes()).await?;
303    stream.write_all(b"\n").await?;
304    stream.flush().await?;
305    Ok(stream)
306}
307
308/// Try connecting with retries and delay between attempts.
309pub async fn connect_with_retry(
310    addr: &str,
311    api_key: &str,
312    retries: usize,
313    delay: Duration,
314) -> Result<TcpStream, std::io::Error> {
315    for attempt in 0..=retries {
316        match connect_with_auth(addr, api_key).await {
317            Ok(s) => return Ok(s),
318            Err(e) => {
319                if attempt == retries {
320                    return Err(e);
321                }
322                sleep(delay).await;
323            }
324        }
325    }
326    unreachable!()
327}
328
329/// Send a single `Command` over the provided `TcpStream` (adds a trailing newline).
330pub async fn send_command(stream: &mut TcpStream, cmd: &Command) -> Result<(), std::io::Error> {
331    let s = format_command(cmd);
332    stream.write_all(s.as_bytes()).await?;
333    stream.write_all(b"\n").await?;
334    stream.flush().await?;
335    Ok(())
336}
337
338/// Start a background task that maintains a long-lived TCP connection to
339/// `addr`, authenticates with `api_key`, forwards received lines to the
340/// returned `mpsc::Receiver<String>`, and accepts outgoing `Command`s via
341/// the returned `mpsc::Sender<Command>`. The task will attempt reconnection on
342/// disconnect.
343pub fn start_long_lived_tcp(
344    addr: String,
345    api_key: String,
346) -> (mpsc::Receiver<String>, mpsc::Sender<Command>) {
347    let (tx_in, rx_in) = mpsc::channel(128);
348    let (tx_cmd, rx_cmd) = mpsc::channel(128);
349
350    tokio::spawn(async move {
351        // rx_cmd is the receiver side that will yield outgoing commands.
352        let mut rx_cmd = rx_cmd;
353
354        loop {
355            match connect_with_retry(&addr, &api_key, 3, Duration::from_secs(1)).await {
356                Ok(stream) => {
357                    let (read_half, mut write_half) = tokio::io::split(stream);
358                    let mut lines = BufReader::new(read_half).lines();
359
360                    loop {
361                        tokio::select! {
362                            res = lines.next_line() => {
363                                match res {
364                                    Ok(Some(line)) => {
365                                        if tx_in.send(line).await.is_err() {
366                                            // receiver dropped — stop background task
367                                            return;
368                                        }
369                                    }
370                                    Ok(None) => {
371                                        // connection closed by peer
372                                        break;
373                                    }
374                                    Err(_) => {
375                                        // read error, break to reconnect
376                                        break;
377                                    }
378                                }
379                            }
380                            maybe_cmd = rx_cmd.recv() => {
381                                match maybe_cmd {
382                                    Some(cmd) => {
383                                        let s = format_command(&cmd);
384                                        if let Err(_) = write_half.write_all(s.as_bytes()).await { break; }
385                                        if let Err(_) = write_half.write_all(b"\n").await { break; }
386                                        if let Err(_) = write_half.flush().await { break; }
387                                    }
388                                    None => {
389                                        // all senders dropped, no more outgoing commands
390                                        return;
391                                    }
392                                }
393                            }
394                        }
395                    }
396                    // loop will try to reconnect
397                }
398                Err(_) => {
399                    // failed to connect after retries — wait before retrying
400                    sleep(Duration::from_secs(1)).await;
401                }
402            }
403        }
404    });
405
406    (rx_in, tx_cmd)
407}
408
409/// A higher-level client wrapper that provides incoming/outgoing channels
410/// and a graceful shutdown handle for the background connection task.
411pub struct Client {
412    /// Incoming lines received from the server.
413    pub incoming: mpsc::Receiver<String>,
414    /// Send `Command` values to be written to the server.
415    pub outgoing: mpsc::Sender<Command>,
416    shutdown_tx: mpsc::Sender<()>,
417    handle: Option<JoinHandle<()>>,
418}
419
420impl Client {
421    /// Spawn a background task to maintain a long-lived connection to `addr`.
422    /// Returns a `Client` that can send commands and receive incoming lines.
423    pub fn start(addr: String, api_key: String) -> Self {
424        let (tx_in, rx_in) = mpsc::channel(128);
425        let (tx_cmd, rx_cmd) = mpsc::channel(128);
426        let (shutdown_tx, mut shutdown_rx) = mpsc::channel(1);
427
428        let handle = tokio::spawn(async move {
429            let mut rx_cmd = rx_cmd;
430
431            loop {
432                // If shutdown requested before connect, exit (non-blocking check)
433                if shutdown_rx.try_recv().is_ok() {
434                    return;
435                }
436
437                match connect_with_retry(&addr, &api_key, 3, Duration::from_secs(1)).await {
438                    Ok(stream) => {
439                        let (read_half, mut write_half) = tokio::io::split(stream);
440                        let mut lines = BufReader::new(read_half).lines();
441
442                        loop {
443                            tokio::select! {
444                                _ = shutdown_rx.recv() => {
445                                    return;
446                                }
447                                res = lines.next_line() => {
448                                    match res {
449                                        Ok(Some(line)) => {
450                                            if tx_in.send(line).await.is_err() {
451                                                return;
452                                            }
453                                        }
454                                        Ok(None) => { break; }
455                                        Err(_) => { break; }
456                                    }
457                                }
458                                maybe_cmd = rx_cmd.recv() => {
459                                    match maybe_cmd {
460                                        Some(cmd) => {
461                                            let s = format_command(&cmd);
462                                            if let Err(_) = write_half.write_all(s.as_bytes()).await { break; }
463                                            if let Err(_) = write_half.write_all(b"\n").await { break; }
464                                            if let Err(_) = write_half.flush().await { break; }
465                                        }
466                                        None => {
467                                            // All senders dropped, exit background task
468                                            return;
469                                        }
470                                    }
471                                }
472                            }
473                        }
474                        // connection closed; try to reconnect
475                    }
476                    Err(_) => {
477                        // failed to connect after retries — wait or exit on shutdown
478                        tokio::select! {
479                            _ = shutdown_rx.recv() => return,
480                            _ = sleep(Duration::from_secs(1)) => {}
481                        }
482                    }
483                }
484            }
485        });
486
487        Client {
488            incoming: rx_in,
489            outgoing: tx_cmd,
490            shutdown_tx,
491            handle: Some(handle),
492        }
493    }
494
495    /// Asynchronously send a `Command` to the server via the background task.
496    pub async fn send(&self, cmd: Command) -> Result<(), mpsc::error::SendError<Command>> {
497        self.outgoing.send(cmd).await
498    }
499
500    /// Try to synchronously send a `Command` without awaiting.
501    pub fn try_send(&self, cmd: Command) -> Result<(), mpsc::error::TrySendError<Command>> {
502        self.outgoing.try_send(cmd)
503    }
504
505    /// Gracefully shutdown the background task and wait for it to finish.
506    pub async fn shutdown(mut self) -> Result<(), tokio::task::JoinError> {
507        let _ = self.shutdown_tx.send(()).await;
508        if let Some(handle) = self.handle.take() {
509            handle.await
510        } else {
511            Ok(())
512        }
513    }
514}
515
516impl Drop for Client {
517    fn drop(&mut self) {
518        // Best-effort: signal shutdown and abort the task so we don't leak it.
519        let _ = self.shutdown_tx.try_send(());
520        if let Some(handle) = self.handle.take() {
521            handle.abort();
522        }
523    }
524}
525
526fn needs_quote(s: &str) -> bool {
527    s.is_empty() || s.chars().any(|c| c.is_whitespace() || c == '"' || c == '\\')
528}
529
530fn escape_token(s: &str) -> String {
531    let mut out = String::new();
532    for c in s.chars() {
533        match c {
534            '\\' => out.push_str("\\\\"),
535            '"' => out.push_str("\\\"") ,
536            '\n' => out.push_str("\\n"),
537            '\r' => out.push_str("\\r"),
538            '\t' => out.push_str("\\t"),
539            other => out.push(other),
540        }
541    }
542    out
543}
544
545fn format_token(s: &str) -> String {
546    if needs_quote(s) {
547        format!("\"{}\"", escape_token(s))
548    } else {
549        s.to_string()
550    }
551}