Skip to main content

syncpond_protocol/
protocol.rs

1use serde_json::Value;
2use tokio::net::TcpStream;
3use tokio::io::{AsyncWriteExt, AsyncBufReadExt, BufReader};
4use tokio::sync::mpsc;
5use tokio::time::{sleep, Duration};
6use tokio::task::JoinHandle;
7
8/// Command enum representing the app-level commands.
9#[derive(Debug, Clone, PartialEq)]
10pub enum Command {
11    RoomCreate,
12    RoomDelete(u64),
13    RoomList,
14    RoomInfo(u64),
15    RoomLabel(u64, String),
16    RoomFind(String),
17    Set { room_id: u64, container: String, key: String, value: Value },
18    Del { room_id: u64, container: String, key: String },
19    Get { room_id: u64, container: String, key: String },
20    /// Server-only operations that operate on the reserved `server_only` container.
21    ServerSet { room_id: u64, key: String, value: Value },
22    ServerDel { room_id: u64, key: String },
23    ServerGet { room_id: u64, key: String },
24    Version(u64),
25    SetJwtKey(String),
26    TxBegin(u64),
27    TxEnd(u64),
28    TxAbort(u64),
29    TokenGen { room_id: u64, containers: Vec<String> },
30    Save { room_id: u64 },
31    Load { room_id: u64 },
32    PersistSet { room_id: u64, container: String, key: String },
33    PersistUnset { room_id: u64, container: String, key: String },
34    PersistGet { room_id: u64, container: String, key: String },
35}
36
37/// Simple response representation. `Ok` may contain an optional payload string.
38#[derive(Debug, Clone, PartialEq)]
39pub enum Response {
40    Ok(Option<String>),
41    Error(String),
42}
43
44/// Parse an incoming command line into `Command`.
45/// Errors are returned as server-style strings like `ERROR missing_argument`.
46pub fn parse_command(line: &str) -> Result<Command, String> {
47    let mut remainder = line.trim_start();
48
49    let (cmd, rest) = match take_token(remainder) {
50        Ok((token, rest)) => (token, rest),
51        Err(e) => return Err(e),
52    };
53    remainder = rest;
54
55    match cmd.as_ref() {
56        "ROOM.CREATE" => {
57            if !remainder.trim().is_empty() {
58                return Err("ERROR extra_arguments".into());
59            }
60            Ok(Command::RoomCreate)
61        }
62        "ROOM.DELETE" => {
63            let (room_id, rest) = parse_room_id_from_remainder(remainder)?;
64            if !rest.trim().is_empty() {
65                return Err("ERROR extra_arguments".into());
66            }
67            Ok(Command::RoomDelete(room_id))
68        }
69        "ROOM.LIST" => {
70            if !remainder.trim().is_empty() {
71                return Err("ERROR extra_arguments".into());
72            }
73            Ok(Command::RoomList)
74        }
75        "ROOM.INFO" => {
76            let (room_id, rest) = parse_room_id_from_remainder(remainder)?;
77            if !rest.trim().is_empty() {
78                return Err("ERROR extra_arguments".into());
79            }
80            Ok(Command::RoomInfo(room_id))
81        }
82        "ROOM.LABEL" => {
83            let (room_id, rest) = parse_room_id_from_remainder(remainder)?;
84            let (label, rest) = take_token(rest)?;
85            if label.trim().is_empty() {
86                return Err("ERROR label_invalid".into());
87            }
88            if !rest.trim().is_empty() {
89                return Err("ERROR extra_arguments".into());
90            }
91            Ok(Command::RoomLabel(room_id, label))
92        }
93        "ROOM.FIND" => {
94            let (label, rest) = take_token(remainder)?;
95            if !rest.trim().is_empty() {
96                return Err("ERROR extra_arguments".into());
97            }
98            Ok(Command::RoomFind(label))
99        }
100        "SET" => {
101            let (room_id, rest) = parse_room_id_from_remainder(remainder)?;
102            let (container, rest) = take_token(rest)?;
103            let (key, rest) = take_token(rest)?;
104            let value_json = rest.trim_start();
105            if value_json.is_empty() {
106                return Err("ERROR missing_value".into());
107            }
108            let value: Value = match serde_json::from_str(value_json) {
109                Ok(v) => v,
110                Err(err) => return Err(format!("ERROR invalid_json {}", err)),
111            };
112            Ok(Command::Set { room_id, container, key, value })
113        }
114        "DEL" => {
115            let (room_id, rest) = parse_room_id_from_remainder(remainder)?;
116            let (container, rest) = take_token(rest)?;
117            let (key, rest) = take_token(rest)?;
118            if !rest.trim().is_empty() {
119                return Err("ERROR extra_arguments".into());
120            }
121            Ok(Command::Del { room_id, container, key })
122        }
123        "GET" => {
124            let (room_id, rest) = parse_room_id_from_remainder(remainder)?;
125            let (container, rest) = take_token(rest)?;
126            let (key, rest) = take_token(rest)?;
127            if !rest.trim().is_empty() {
128                return Err("ERROR extra_arguments".into());
129            }
130            Ok(Command::Get { room_id, container, key })
131        }
132        "SERVER.SET" => {
133            let (room_id, rest) = parse_room_id_from_remainder(remainder)?;
134            let (key, rest) = take_token(rest)?;
135            let value_json = rest.trim_start();
136            if value_json.is_empty() {
137                return Err("ERROR missing_value".into());
138            }
139            let value: Value = match serde_json::from_str(value_json) {
140                Ok(v) => v,
141                Err(err) => return Err(format!("ERROR invalid_json {}", err)),
142            };
143            Ok(Command::ServerSet { room_id, key, value })
144        }
145        "SERVER.DEL" => {
146            let (room_id, rest) = parse_room_id_from_remainder(remainder)?;
147            let (key, rest) = take_token(rest)?;
148            if !rest.trim().is_empty() {
149                return Err("ERROR extra_arguments".into());
150            }
151            Ok(Command::ServerDel { room_id, key })
152        }
153        "SERVER.GET" => {
154            let (room_id, rest) = parse_room_id_from_remainder(remainder)?;
155            let (key, rest) = take_token(rest)?;
156            if !rest.trim().is_empty() {
157                return Err("ERROR extra_arguments".into());
158            }
159            Ok(Command::ServerGet { room_id, key })
160        }
161        "VERSION" => {
162            let (room_id, rest) = parse_room_id_from_remainder(remainder)?;
163            if !rest.trim().is_empty() {
164                return Err("ERROR extra_arguments".into());
165            }
166            Ok(Command::Version(room_id))
167        }
168        "SET.JWTKEY" => {
169            let (key, rest) = take_token(remainder)?;
170            if !rest.trim().is_empty() {
171                return Err("ERROR extra_arguments".into());
172            }
173            Ok(Command::SetJwtKey(key))
174        }
175        "TX.BEGIN" => {
176            let (room_id, rest) = parse_room_id_from_remainder(remainder)?;
177            if !rest.trim().is_empty() {
178                return Err("ERROR extra_arguments".into());
179            }
180            Ok(Command::TxBegin(room_id))
181        }
182        "TX.END" => {
183            let (room_id, rest) = parse_room_id_from_remainder(remainder)?;
184            if !rest.trim().is_empty() {
185                return Err("ERROR extra_arguments".into());
186            }
187            Ok(Command::TxEnd(room_id))
188        }
189        "TX.ABORT" => {
190            let (room_id, rest) = parse_room_id_from_remainder(remainder)?;
191            if !rest.trim().is_empty() {
192                return Err("ERROR extra_arguments".into());
193            }
194            Ok(Command::TxAbort(room_id))
195        }
196        "TOKEN.GEN" => {
197            let (room_id_str, rest) = take_token(remainder)?;
198            let room_id = match room_id_str.parse::<u64>() {
199                Ok(id) => id,
200                Err(_) => return Err("ERROR invalid_room_id".into()),
201            };
202            let mut containers = Vec::new();
203            let mut leftover = rest;
204            while !leftover.trim().is_empty() {
205                let (tok, rem) = take_token(leftover)?;
206                containers.push(tok);
207                leftover = rem;
208            }
209            Ok(Command::TokenGen { room_id, containers })
210        }
211        "SAVE" => {
212            let (room_id, rest) = parse_room_id_from_remainder(remainder)?;
213            if !rest.trim().is_empty() {
214                return Err("ERROR extra_arguments".into());
215            }
216            Ok(Command::Save { room_id })
217        }
218        "LOAD" => {
219            let (room_id, rest) = parse_room_id_from_remainder(remainder)?;
220            if !rest.trim().is_empty() {
221                return Err("ERROR extra_arguments".into());
222            }
223            Ok(Command::Load { room_id })
224        }
225        "PERSIST.SET" => {
226            let (room_id, rest) = parse_room_id_from_remainder(remainder)?;
227            let (container, rest) = take_token(rest)?;
228            let (key, rest) = take_token(rest)?;
229            if !rest.trim().is_empty() {
230                return Err("ERROR extra_arguments".into());
231            }
232            Ok(Command::PersistSet { room_id, container, key })
233        }
234        "PERSIST.UNSET" => {
235            let (room_id, rest) = parse_room_id_from_remainder(remainder)?;
236            let (container, rest) = take_token(rest)?;
237            let (key, rest) = take_token(rest)?;
238            if !rest.trim().is_empty() {
239                return Err("ERROR extra_arguments".into());
240            }
241            Ok(Command::PersistUnset { room_id, container, key })
242        }
243        "PERSIST.GET" => {
244            let (room_id, rest) = parse_room_id_from_remainder(remainder)?;
245            let (container, rest) = take_token(rest)?;
246            let (key, rest) = take_token(rest)?;
247            if !rest.trim().is_empty() {
248                return Err("ERROR extra_arguments".into());
249            }
250            Ok(Command::PersistGet { room_id, container, key })
251        }
252        _ => Err("ERROR unknown_command".into()),
253    }
254}
255
256/// Format a `Command` into a single-line string suitable for sending to the server.
257pub fn format_command(cmd: &Command) -> String {
258    match cmd {
259        Command::RoomCreate => "ROOM.CREATE".into(),
260        Command::RoomDelete(id) => format!("ROOM.DELETE {}", id),
261        Command::RoomList => "ROOM.LIST".into(),
262        Command::RoomInfo(id) => format!("ROOM.INFO {}", id),
263        Command::RoomLabel(id, label) => format!("ROOM.LABEL {} {}", id, format_token(label)),
264        Command::RoomFind(label) => format!("ROOM.FIND {}", format_token(label)),
265        Command::Set { room_id, container, key, value } => {
266            let v = serde_json::to_string(value).unwrap_or_else(|_| "null".into());
267            format!("SET {} {} {} {}", room_id, format_token(container), format_token(key), v)
268        }
269        Command::Del { room_id, container, key } => {
270            format!("DEL {} {} {}", room_id, format_token(container), format_token(key))
271        }
272        Command::Get { room_id, container, key } => {
273            format!("GET {} {} {}", room_id, format_token(container), format_token(key))
274        }
275        Command::Version(id) => format!("VERSION {}", id),
276        Command::SetJwtKey(k) => format!("SET.JWTKEY {}", format_token(k)),
277        Command::ServerSet { room_id, key, value } => {
278            let v = serde_json::to_string(value).unwrap_or_else(|_| "null".into());
279            format!("SERVER.SET {} {} {}", room_id, format_token(key), v)
280        }
281        Command::TxBegin(id) => format!("TX.BEGIN {}", id),
282        Command::TxEnd(id) => format!("TX.END {}", id),
283        Command::TxAbort(id) => format!("TX.ABORT {}", id),
284        Command::TokenGen { room_id, containers } => {
285            let mut parts = vec![room_id.to_string()];
286            parts.extend(containers.iter().map(|c| format_token(c)));
287            format!("TOKEN.GEN {}", parts.join(" "))
288        }
289        Command::Save { room_id } => {
290            format!("SAVE {}", room_id)
291        }
292        Command::Load { room_id } => {
293            format!("LOAD {}", room_id)
294        }
295        Command::PersistSet { room_id, container, key } => {
296            format!("PERSIST.SET {} {} {}", room_id, format_token(container), format_token(key))
297        }
298        Command::PersistUnset { room_id, container, key } => {
299            format!("PERSIST.UNSET {} {} {}", room_id, format_token(container), format_token(key))
300        }
301        Command::PersistGet { room_id, container, key } => {
302            format!("PERSIST.GET {} {} {}", room_id, format_token(container), format_token(key))
303        }
304        Command::ServerDel { room_id, key } => {
305            format!("SERVER.DEL {} {}", room_id, format_token(key))
306        }
307        Command::ServerGet { room_id, key } => {
308            format!("SERVER.GET {} {}", room_id, format_token(key))
309        }
310    }
311}
312
313/// Parse a server response line into `Response`.
314pub fn parse_response(line: &str) -> Result<Response, String> {
315    let s = line.trim();
316    if s.starts_with("OK") {
317        let rest = s[2..].trim_start();
318        if rest.is_empty() {
319            Ok(Response::Ok(None))
320        } else {
321            Ok(Response::Ok(Some(rest.to_string())))
322        }
323    } else if s.starts_with("ERROR") {
324        let rest = s[5..].trim_start();
325        Ok(Response::Error(rest.to_string()))
326    } else {
327        Err("ERROR invalid_response".into())
328    }
329}
330
331/// Format a `Response` back to a line.
332pub fn format_response(resp: &Response) -> String {
333    match resp {
334        Response::Ok(None) => "OK".into(),
335        Response::Ok(Some(p)) => format!("OK {}", p),
336        Response::Error(msg) => format!("ERROR {}", msg),
337    }
338}
339
340// ----------------- helpers -----------------
341
342fn take_token(input: &str) -> Result<(String, &str), String> {
343    let input = input.trim_start();
344    if input.is_empty() {
345        return Err("ERROR missing_argument".into());
346    }
347
348    if input.starts_with('"') {
349        let mut buf = String::new();
350        let mut escaped = false;
351        for (i, c) in input[1..].char_indices() {
352            if escaped {
353                match c {
354                    '\\' => buf.push('\\'),
355                    '"' => buf.push('"'),
356                    'n' => buf.push('\n'),
357                    'r' => buf.push('\r'),
358                    't' => buf.push('\t'),
359                    other => buf.push(other),
360                }
361                escaped = false;
362                continue;
363            }
364
365            if c == '\\' {
366                escaped = true;
367                continue;
368            }
369
370            if c == '"' {
371                let end = 1 + i + c.len_utf8();
372                let rest = &input[end..];
373                return Ok((buf, rest));
374            }
375
376            buf.push(c);
377        }
378
379        Err("ERROR invalid_argument".into())
380    } else {
381        let mut end = input.len();
382        for (i, c) in input.char_indices() {
383            if c.is_whitespace() {
384                end = i;
385                break;
386            }
387        }
388        let token = input[..end].to_string();
389        let rest = &input[end..];
390        Ok((token, rest))
391    }
392}
393
394fn parse_room_id_from_remainder(remainder: &str) -> Result<(u64, &str), String> {
395    let (room_id, rest) = take_token(remainder)?;
396    let parsed = room_id
397        .parse::<u64>()
398        .map_err(|_| "ERROR invalid_room_id".to_string())?;
399    Ok((parsed, rest))
400}
401
402/// Attempt to connect to `addr`, send the `api_key` followed by a newline,
403/// and return the established `TcpStream` if successful.
404pub async fn connect_with_auth(addr: &str, api_key: &str) -> Result<TcpStream, std::io::Error> {
405    let mut stream = TcpStream::connect(addr).await?;
406    stream.write_all(api_key.as_bytes()).await?;
407    stream.write_all(b"\n").await?;
408    stream.flush().await?;
409    Ok(stream)
410}
411
412/// Try connecting with retries and delay between attempts.
413pub async fn connect_with_retry(
414    addr: &str,
415    api_key: &str,
416    retries: usize,
417    delay: Duration,
418) -> Result<TcpStream, std::io::Error> {
419    for attempt in 0..=retries {
420        match connect_with_auth(addr, api_key).await {
421            Ok(s) => return Ok(s),
422            Err(e) => {
423                if attempt == retries {
424                    return Err(e);
425                }
426                sleep(delay).await;
427            }
428        }
429    }
430    unreachable!()
431}
432
433/// Send a single `Command` over the provided `TcpStream` (adds a trailing newline).
434pub async fn send_command(stream: &mut TcpStream, cmd: &Command) -> Result<(), std::io::Error> {
435    let s = format_command(cmd);
436    stream.write_all(s.as_bytes()).await?;
437    stream.write_all(b"\n").await?;
438    stream.flush().await?;
439    Ok(())
440}
441
442/// Start a background task that maintains a long-lived TCP connection to
443/// `addr`, authenticates with `api_key`, forwards received lines to the
444/// returned `mpsc::Receiver<String>`, and accepts outgoing `Command`s via
445/// the returned `mpsc::Sender<Command>`. The task will attempt reconnection on
446/// disconnect.
447pub fn start_long_lived_tcp(
448    addr: String,
449    api_key: String,
450) -> (mpsc::Receiver<String>, mpsc::Sender<Command>) {
451    let (tx_in, rx_in) = mpsc::channel(128);
452    let (tx_cmd, rx_cmd) = mpsc::channel(128);
453
454    tokio::spawn(async move {
455        // rx_cmd is the receiver side that will yield outgoing commands.
456        let mut rx_cmd = rx_cmd;
457
458        loop {
459            match connect_with_retry(&addr, &api_key, 3, Duration::from_secs(1)).await {
460                Ok(stream) => {
461                    let (read_half, mut write_half) = tokio::io::split(stream);
462                    let mut lines = BufReader::new(read_half).lines();
463
464                    loop {
465                        tokio::select! {
466                            res = lines.next_line() => {
467                                match res {
468                                    Ok(Some(line)) => {
469                                        if tx_in.send(line).await.is_err() {
470                                            // receiver dropped — stop background task
471                                            return;
472                                        }
473                                    }
474                                    Ok(None) => {
475                                        // connection closed by peer
476                                        break;
477                                    }
478                                    Err(_) => {
479                                        // read error, break to reconnect
480                                        break;
481                                    }
482                                }
483                            }
484                            maybe_cmd = rx_cmd.recv() => {
485                                match maybe_cmd {
486                                    Some(cmd) => {
487                                        let s = format_command(&cmd);
488                                        if let Err(_) = write_half.write_all(s.as_bytes()).await { break; }
489                                        if let Err(_) = write_half.write_all(b"\n").await { break; }
490                                        if let Err(_) = write_half.flush().await { break; }
491                                    }
492                                    None => {
493                                        // all senders dropped, no more outgoing commands
494                                        return;
495                                    }
496                                }
497                            }
498                        }
499                    }
500                    // loop will try to reconnect
501                }
502                Err(_) => {
503                    // failed to connect after retries — wait before retrying
504                    sleep(Duration::from_secs(1)).await;
505                }
506            }
507        }
508    });
509
510    (rx_in, tx_cmd)
511}
512
513/// A higher-level client wrapper that provides incoming/outgoing channels
514/// and a graceful shutdown handle for the background connection task.
515pub struct Client {
516    /// Incoming lines received from the server.
517    pub incoming: mpsc::Receiver<String>,
518    /// Send `Command` values to be written to the server.
519    pub outgoing: mpsc::Sender<Command>,
520    shutdown_tx: mpsc::Sender<()>,
521    handle: Option<JoinHandle<()>>,
522}
523
524impl Client {
525    /// Spawn a background task to maintain a long-lived connection to `addr`.
526    /// Returns a `Client` that can send commands and receive incoming lines.
527    pub fn start(addr: String, api_key: String) -> Self {
528        let (tx_in, rx_in) = mpsc::channel(128);
529        let (tx_cmd, rx_cmd) = mpsc::channel(128);
530        let (shutdown_tx, mut shutdown_rx) = mpsc::channel(1);
531
532        let handle = tokio::spawn(async move {
533            let mut rx_cmd = rx_cmd;
534
535            loop {
536                // If shutdown requested before connect, exit (non-blocking check)
537                if shutdown_rx.try_recv().is_ok() {
538                    return;
539                }
540
541                match connect_with_retry(&addr, &api_key, 3, Duration::from_secs(1)).await {
542                    Ok(stream) => {
543                        let (read_half, mut write_half) = tokio::io::split(stream);
544                        let mut lines = BufReader::new(read_half).lines();
545
546                        loop {
547                            tokio::select! {
548                                _ = shutdown_rx.recv() => {
549                                    return;
550                                }
551                                res = lines.next_line() => {
552                                    match res {
553                                        Ok(Some(line)) => {
554                                            if tx_in.send(line).await.is_err() {
555                                                return;
556                                            }
557                                        }
558                                        Ok(None) => { break; }
559                                        Err(_) => { break; }
560                                    }
561                                }
562                                maybe_cmd = rx_cmd.recv() => {
563                                    match maybe_cmd {
564                                        Some(cmd) => {
565                                            let s = format_command(&cmd);
566                                            if let Err(_) = write_half.write_all(s.as_bytes()).await { break; }
567                                            if let Err(_) = write_half.write_all(b"\n").await { break; }
568                                            if let Err(_) = write_half.flush().await { break; }
569                                        }
570                                        None => {
571                                            // All senders dropped, exit background task
572                                            return;
573                                        }
574                                    }
575                                }
576                            }
577                        }
578                        // connection closed; try to reconnect
579                    }
580                    Err(_) => {
581                        // failed to connect after retries — wait or exit on shutdown
582                        tokio::select! {
583                            _ = shutdown_rx.recv() => return,
584                            _ = sleep(Duration::from_secs(1)) => {}
585                        }
586                    }
587                }
588            }
589        });
590
591        Client {
592            incoming: rx_in,
593            outgoing: tx_cmd,
594            shutdown_tx,
595            handle: Some(handle),
596        }
597    }
598
599    /// Asynchronously send a `Command` to the server via the background task.
600    pub async fn send(&self, cmd: Command) -> Result<(), mpsc::error::SendError<Command>> {
601        self.outgoing.send(cmd).await
602    }
603
604    /// Try to synchronously send a `Command` without awaiting.
605    pub fn try_send(&self, cmd: Command) -> Result<(), mpsc::error::TrySendError<Command>> {
606        self.outgoing.try_send(cmd)
607    }
608
609    /// Gracefully shutdown the background task and wait for it to finish.
610    pub async fn shutdown(mut self) -> Result<(), tokio::task::JoinError> {
611        let _ = self.shutdown_tx.send(()).await;
612        if let Some(handle) = self.handle.take() {
613            handle.await
614        } else {
615            Ok(())
616        }
617    }
618}
619
620impl Drop for Client {
621    fn drop(&mut self) {
622        // Best-effort: signal shutdown and abort the task so we don't leak it.
623        let _ = self.shutdown_tx.try_send(());
624        if let Some(handle) = self.handle.take() {
625            handle.abort();
626        }
627    }
628}
629
630fn needs_quote(s: &str) -> bool {
631    s.is_empty() || s.chars().any(|c| c.is_whitespace() || c == '"' || c == '\\')
632}
633
634fn escape_token(s: &str) -> String {
635    let mut out = String::new();
636    for c in s.chars() {
637        match c {
638            '\\' => out.push_str("\\\\"),
639            '"' => out.push_str("\\\"") ,
640            '\n' => out.push_str("\\n"),
641            '\r' => out.push_str("\\r"),
642            '\t' => out.push_str("\\t"),
643            other => out.push(other),
644        }
645    }
646    out
647}
648
649fn format_token(s: &str) -> String {
650    if needs_quote(s) {
651        format!("\"{}\"", escape_token(s))
652    } else {
653        s.to_string()
654    }
655}