Skip to main content

syncpond_protocol/
protocol.rs

1use serde_json::Value;
2use tokio::net::TcpStream;
3use tokio::io::{AsyncWriteExt, AsyncBufReadExt, BufReader};
4use tokio::sync::mpsc;
5use tokio::time::{sleep, Duration};
6use tokio::task::JoinHandle;
7
8/// Command enum representing the app-level commands.
9#[derive(Debug, Clone, PartialEq)]
10pub enum Command {
11    RoomCreate,
12    RoomDelete(u64),
13    RoomList,
14    RoomInfo(u64),
15    RoomLabel(u64, String),
16    RoomFind(String),
17    Set { room_id: u64, container: String, key: String, value: Value },
18    Del { room_id: u64, container: String, key: String },
19    Get { room_id: u64, container: String, key: String },
20    Version(u64),
21    SetJwtKey(String),
22    TxBegin(u64),
23    TxEnd(u64),
24    TxAbort(u64),
25    TokenGen { room_id: u64, containers: Vec<String> },
26    Save { room_id: u64 },
27    Load { room_id: u64 },
28    PersistSet { room_id: u64, container: String, key: String },
29    PersistUnset { room_id: u64, container: String, key: String },
30    PersistGet { room_id: u64, container: String, key: String },
31}
32
33/// Simple response representation. `Ok` may contain an optional payload string.
34#[derive(Debug, Clone, PartialEq)]
35pub enum Response {
36    Ok(Option<String>),
37    Error(String),
38}
39
40/// Parse an incoming command line into `Command`.
41/// Errors are returned as server-style strings like `ERROR missing_argument`.
42pub fn parse_command(line: &str) -> Result<Command, String> {
43    let mut remainder = line.trim_start();
44
45    let (cmd, rest) = match take_token(remainder) {
46        Ok((token, rest)) => (token, rest),
47        Err(e) => return Err(e),
48    };
49    remainder = rest;
50
51    match cmd.as_ref() {
52        "ROOM.CREATE" => {
53            if !remainder.trim().is_empty() {
54                return Err("ERROR extra_arguments".into());
55            }
56            Ok(Command::RoomCreate)
57        }
58        "ROOM.DELETE" => {
59            let (room_id, rest) = parse_room_id_from_remainder(remainder)?;
60            if !rest.trim().is_empty() {
61                return Err("ERROR extra_arguments".into());
62            }
63            Ok(Command::RoomDelete(room_id))
64        }
65        "ROOM.LIST" => {
66            if !remainder.trim().is_empty() {
67                return Err("ERROR extra_arguments".into());
68            }
69            Ok(Command::RoomList)
70        }
71        "ROOM.INFO" => {
72            let (room_id, rest) = parse_room_id_from_remainder(remainder)?;
73            if !rest.trim().is_empty() {
74                return Err("ERROR extra_arguments".into());
75            }
76            Ok(Command::RoomInfo(room_id))
77        }
78        "ROOM.LABEL" => {
79            let (room_id, rest) = parse_room_id_from_remainder(remainder)?;
80            let (label, rest) = take_token(rest)?;
81            if label.trim().is_empty() {
82                return Err("ERROR label_invalid".into());
83            }
84            if !rest.trim().is_empty() {
85                return Err("ERROR extra_arguments".into());
86            }
87            Ok(Command::RoomLabel(room_id, label))
88        }
89        "ROOM.FIND" => {
90            let (label, rest) = take_token(remainder)?;
91            if !rest.trim().is_empty() {
92                return Err("ERROR extra_arguments".into());
93            }
94            Ok(Command::RoomFind(label))
95        }
96        "SET" => {
97            let (room_id, rest) = parse_room_id_from_remainder(remainder)?;
98            let (container, rest) = take_token(rest)?;
99            let (key, rest) = take_token(rest)?;
100            let value_json = rest.trim_start();
101            if value_json.is_empty() {
102                return Err("ERROR missing_value".into());
103            }
104            let value: Value = match serde_json::from_str(value_json) {
105                Ok(v) => v,
106                Err(err) => return Err(format!("ERROR invalid_json {}", err)),
107            };
108            Ok(Command::Set { room_id, container, key, value })
109        }
110        "DEL" => {
111            let (room_id, rest) = parse_room_id_from_remainder(remainder)?;
112            let (container, rest) = take_token(rest)?;
113            let (key, rest) = take_token(rest)?;
114            if !rest.trim().is_empty() {
115                return Err("ERROR extra_arguments".into());
116            }
117            Ok(Command::Del { room_id, container, key })
118        }
119        "GET" => {
120            let (room_id, rest) = parse_room_id_from_remainder(remainder)?;
121            let (container, rest) = take_token(rest)?;
122            let (key, rest) = take_token(rest)?;
123            if !rest.trim().is_empty() {
124                return Err("ERROR extra_arguments".into());
125            }
126            Ok(Command::Get { room_id, container, key })
127        }
128        "VERSION" => {
129            let (room_id, rest) = parse_room_id_from_remainder(remainder)?;
130            if !rest.trim().is_empty() {
131                return Err("ERROR extra_arguments".into());
132            }
133            Ok(Command::Version(room_id))
134        }
135        "SET.JWTKEY" => {
136            let (key, rest) = take_token(remainder)?;
137            if !rest.trim().is_empty() {
138                return Err("ERROR extra_arguments".into());
139            }
140            Ok(Command::SetJwtKey(key))
141        }
142        "TX.BEGIN" => {
143            let (room_id, rest) = parse_room_id_from_remainder(remainder)?;
144            if !rest.trim().is_empty() {
145                return Err("ERROR extra_arguments".into());
146            }
147            Ok(Command::TxBegin(room_id))
148        }
149        "TX.END" => {
150            let (room_id, rest) = parse_room_id_from_remainder(remainder)?;
151            if !rest.trim().is_empty() {
152                return Err("ERROR extra_arguments".into());
153            }
154            Ok(Command::TxEnd(room_id))
155        }
156        "TX.ABORT" => {
157            let (room_id, rest) = parse_room_id_from_remainder(remainder)?;
158            if !rest.trim().is_empty() {
159                return Err("ERROR extra_arguments".into());
160            }
161            Ok(Command::TxAbort(room_id))
162        }
163        "TOKEN.GEN" => {
164            let (room_id_str, rest) = take_token(remainder)?;
165            let room_id = match room_id_str.parse::<u64>() {
166                Ok(id) => id,
167                Err(_) => return Err("ERROR invalid_room_id".into()),
168            };
169            let mut containers = Vec::new();
170            let mut leftover = rest;
171            while !leftover.trim().is_empty() {
172                let (tok, rem) = take_token(leftover)?;
173                containers.push(tok);
174                leftover = rem;
175            }
176            Ok(Command::TokenGen { room_id, containers })
177        }
178        "SAVE" => {
179            let (room_id, rest) = parse_room_id_from_remainder(remainder)?;
180            if !rest.trim().is_empty() {
181                return Err("ERROR extra_arguments".into());
182            }
183            Ok(Command::Save { room_id })
184        }
185        "LOAD" => {
186            let (room_id, rest) = parse_room_id_from_remainder(remainder)?;
187            if !rest.trim().is_empty() {
188                return Err("ERROR extra_arguments".into());
189            }
190            Ok(Command::Load { room_id })
191        }
192        "PERSIST.SET" => {
193            let (room_id, rest) = parse_room_id_from_remainder(remainder)?;
194            let (container, rest) = take_token(rest)?;
195            let (key, rest) = take_token(rest)?;
196            if !rest.trim().is_empty() {
197                return Err("ERROR extra_arguments".into());
198            }
199            Ok(Command::PersistSet { room_id, container, key })
200        }
201        "PERSIST.UNSET" => {
202            let (room_id, rest) = parse_room_id_from_remainder(remainder)?;
203            let (container, rest) = take_token(rest)?;
204            let (key, rest) = take_token(rest)?;
205            if !rest.trim().is_empty() {
206                return Err("ERROR extra_arguments".into());
207            }
208            Ok(Command::PersistUnset { room_id, container, key })
209        }
210        "PERSIST.GET" => {
211            let (room_id, rest) = parse_room_id_from_remainder(remainder)?;
212            let (container, rest) = take_token(rest)?;
213            let (key, rest) = take_token(rest)?;
214            if !rest.trim().is_empty() {
215                return Err("ERROR extra_arguments".into());
216            }
217            Ok(Command::PersistGet { room_id, container, key })
218        }
219        _ => Err("ERROR unknown_command".into()),
220    }
221}
222
223/// Format a `Command` into a single-line string suitable for sending to the server.
224pub fn format_command(cmd: &Command) -> String {
225    match cmd {
226        Command::RoomCreate => "ROOM.CREATE".into(),
227        Command::RoomDelete(id) => format!("ROOM.DELETE {}", id),
228        Command::RoomList => "ROOM.LIST".into(),
229        Command::RoomInfo(id) => format!("ROOM.INFO {}", id),
230        Command::RoomLabel(id, label) => format!("ROOM.LABEL {} {}", id, format_token(label)),
231        Command::RoomFind(label) => format!("ROOM.FIND {}", format_token(label)),
232        Command::Set { room_id, container, key, value } => {
233            let v = serde_json::to_string(value).unwrap_or_else(|_| "null".into());
234            format!("SET {} {} {} {}", room_id, format_token(container), format_token(key), v)
235        }
236        Command::Del { room_id, container, key } => {
237            format!("DEL {} {} {}", room_id, format_token(container), format_token(key))
238        }
239        Command::Get { room_id, container, key } => {
240            format!("GET {} {} {}", room_id, format_token(container), format_token(key))
241        }
242        Command::Version(id) => format!("VERSION {}", id),
243        Command::SetJwtKey(k) => format!("SET.JWTKEY {}", format_token(k)),
244        Command::TxBegin(id) => format!("TX.BEGIN {}", id),
245        Command::TxEnd(id) => format!("TX.END {}", id),
246        Command::TxAbort(id) => format!("TX.ABORT {}", id),
247        Command::TokenGen { room_id, containers } => {
248            let mut parts = vec![room_id.to_string()];
249            parts.extend(containers.iter().map(|c| format_token(c)));
250            format!("TOKEN.GEN {}", parts.join(" "))
251        }
252        Command::Save { room_id } => {
253            format!("SAVE {}", room_id)
254        }
255        Command::Load { room_id } => {
256            format!("LOAD {}", room_id)
257        }
258        Command::PersistSet { room_id, container, key } => {
259            format!("PERSIST.SET {} {} {}", room_id, format_token(container), format_token(key))
260        }
261        Command::PersistUnset { room_id, container, key } => {
262            format!("PERSIST.UNSET {} {} {}", room_id, format_token(container), format_token(key))
263        }
264        Command::PersistGet { room_id, container, key } => {
265            format!("PERSIST.GET {} {} {}", room_id, format_token(container), format_token(key))
266        }
267    }
268}
269
270/// Parse a server response line into `Response`.
271pub fn parse_response(line: &str) -> Result<Response, String> {
272    let s = line.trim();
273    if s.starts_with("OK") {
274        let rest = s[2..].trim_start();
275        if rest.is_empty() {
276            Ok(Response::Ok(None))
277        } else {
278            Ok(Response::Ok(Some(rest.to_string())))
279        }
280    } else if s.starts_with("ERROR") {
281        let rest = s[5..].trim_start();
282        Ok(Response::Error(rest.to_string()))
283    } else {
284        Err("ERROR invalid_response".into())
285    }
286}
287
288/// Format a `Response` back to a line.
289pub fn format_response(resp: &Response) -> String {
290    match resp {
291        Response::Ok(None) => "OK".into(),
292        Response::Ok(Some(p)) => format!("OK {}", p),
293        Response::Error(msg) => format!("ERROR {}", msg),
294    }
295}
296
297// ----------------- helpers -----------------
298
299fn take_token(input: &str) -> Result<(String, &str), String> {
300    let input = input.trim_start();
301    if input.is_empty() {
302        return Err("ERROR missing_argument".into());
303    }
304
305    if input.starts_with('"') {
306        let mut buf = String::new();
307        let mut escaped = false;
308        for (i, c) in input[1..].char_indices() {
309            if escaped {
310                match c {
311                    '\\' => buf.push('\\'),
312                    '"' => buf.push('"'),
313                    'n' => buf.push('\n'),
314                    'r' => buf.push('\r'),
315                    't' => buf.push('\t'),
316                    other => buf.push(other),
317                }
318                escaped = false;
319                continue;
320            }
321
322            if c == '\\' {
323                escaped = true;
324                continue;
325            }
326
327            if c == '"' {
328                let end = 1 + i + c.len_utf8();
329                let rest = &input[end..];
330                return Ok((buf, rest));
331            }
332
333            buf.push(c);
334        }
335
336        Err("ERROR invalid_argument".into())
337    } else {
338        let mut end = input.len();
339        for (i, c) in input.char_indices() {
340            if c.is_whitespace() {
341                end = i;
342                break;
343            }
344        }
345        let token = input[..end].to_string();
346        let rest = &input[end..];
347        Ok((token, rest))
348    }
349}
350
351fn parse_room_id_from_remainder(remainder: &str) -> Result<(u64, &str), String> {
352    let (room_id, rest) = take_token(remainder)?;
353    let parsed = room_id
354        .parse::<u64>()
355        .map_err(|_| "ERROR invalid_room_id".to_string())?;
356    Ok((parsed, rest))
357}
358
359/// Attempt to connect to `addr`, send the `api_key` followed by a newline,
360/// and return the established `TcpStream` if successful.
361pub async fn connect_with_auth(addr: &str, api_key: &str) -> Result<TcpStream, std::io::Error> {
362    let mut stream = TcpStream::connect(addr).await?;
363    stream.write_all(api_key.as_bytes()).await?;
364    stream.write_all(b"\n").await?;
365    stream.flush().await?;
366    Ok(stream)
367}
368
369/// Try connecting with retries and delay between attempts.
370pub async fn connect_with_retry(
371    addr: &str,
372    api_key: &str,
373    retries: usize,
374    delay: Duration,
375) -> Result<TcpStream, std::io::Error> {
376    for attempt in 0..=retries {
377        match connect_with_auth(addr, api_key).await {
378            Ok(s) => return Ok(s),
379            Err(e) => {
380                if attempt == retries {
381                    return Err(e);
382                }
383                sleep(delay).await;
384            }
385        }
386    }
387    unreachable!()
388}
389
390/// Send a single `Command` over the provided `TcpStream` (adds a trailing newline).
391pub async fn send_command(stream: &mut TcpStream, cmd: &Command) -> Result<(), std::io::Error> {
392    let s = format_command(cmd);
393    stream.write_all(s.as_bytes()).await?;
394    stream.write_all(b"\n").await?;
395    stream.flush().await?;
396    Ok(())
397}
398
399/// Start a background task that maintains a long-lived TCP connection to
400/// `addr`, authenticates with `api_key`, forwards received lines to the
401/// returned `mpsc::Receiver<String>`, and accepts outgoing `Command`s via
402/// the returned `mpsc::Sender<Command>`. The task will attempt reconnection on
403/// disconnect.
404pub fn start_long_lived_tcp(
405    addr: String,
406    api_key: String,
407) -> (mpsc::Receiver<String>, mpsc::Sender<Command>) {
408    let (tx_in, rx_in) = mpsc::channel(128);
409    let (tx_cmd, rx_cmd) = mpsc::channel(128);
410
411    tokio::spawn(async move {
412        // rx_cmd is the receiver side that will yield outgoing commands.
413        let mut rx_cmd = rx_cmd;
414
415        loop {
416            match connect_with_retry(&addr, &api_key, 3, Duration::from_secs(1)).await {
417                Ok(stream) => {
418                    let (read_half, mut write_half) = tokio::io::split(stream);
419                    let mut lines = BufReader::new(read_half).lines();
420
421                    loop {
422                        tokio::select! {
423                            res = lines.next_line() => {
424                                match res {
425                                    Ok(Some(line)) => {
426                                        if tx_in.send(line).await.is_err() {
427                                            // receiver dropped — stop background task
428                                            return;
429                                        }
430                                    }
431                                    Ok(None) => {
432                                        // connection closed by peer
433                                        break;
434                                    }
435                                    Err(_) => {
436                                        // read error, break to reconnect
437                                        break;
438                                    }
439                                }
440                            }
441                            maybe_cmd = rx_cmd.recv() => {
442                                match maybe_cmd {
443                                    Some(cmd) => {
444                                        let s = format_command(&cmd);
445                                        if let Err(_) = write_half.write_all(s.as_bytes()).await { break; }
446                                        if let Err(_) = write_half.write_all(b"\n").await { break; }
447                                        if let Err(_) = write_half.flush().await { break; }
448                                    }
449                                    None => {
450                                        // all senders dropped, no more outgoing commands
451                                        return;
452                                    }
453                                }
454                            }
455                        }
456                    }
457                    // loop will try to reconnect
458                }
459                Err(_) => {
460                    // failed to connect after retries — wait before retrying
461                    sleep(Duration::from_secs(1)).await;
462                }
463            }
464        }
465    });
466
467    (rx_in, tx_cmd)
468}
469
470/// A higher-level client wrapper that provides incoming/outgoing channels
471/// and a graceful shutdown handle for the background connection task.
472pub struct Client {
473    /// Incoming lines received from the server.
474    pub incoming: mpsc::Receiver<String>,
475    /// Send `Command` values to be written to the server.
476    pub outgoing: mpsc::Sender<Command>,
477    shutdown_tx: mpsc::Sender<()>,
478    handle: Option<JoinHandle<()>>,
479}
480
481impl Client {
482    /// Spawn a background task to maintain a long-lived connection to `addr`.
483    /// Returns a `Client` that can send commands and receive incoming lines.
484    pub fn start(addr: String, api_key: String) -> Self {
485        let (tx_in, rx_in) = mpsc::channel(128);
486        let (tx_cmd, rx_cmd) = mpsc::channel(128);
487        let (shutdown_tx, mut shutdown_rx) = mpsc::channel(1);
488
489        let handle = tokio::spawn(async move {
490            let mut rx_cmd = rx_cmd;
491
492            loop {
493                // If shutdown requested before connect, exit (non-blocking check)
494                if shutdown_rx.try_recv().is_ok() {
495                    return;
496                }
497
498                match connect_with_retry(&addr, &api_key, 3, Duration::from_secs(1)).await {
499                    Ok(stream) => {
500                        let (read_half, mut write_half) = tokio::io::split(stream);
501                        let mut lines = BufReader::new(read_half).lines();
502
503                        loop {
504                            tokio::select! {
505                                _ = shutdown_rx.recv() => {
506                                    return;
507                                }
508                                res = lines.next_line() => {
509                                    match res {
510                                        Ok(Some(line)) => {
511                                            if tx_in.send(line).await.is_err() {
512                                                return;
513                                            }
514                                        }
515                                        Ok(None) => { break; }
516                                        Err(_) => { break; }
517                                    }
518                                }
519                                maybe_cmd = rx_cmd.recv() => {
520                                    match maybe_cmd {
521                                        Some(cmd) => {
522                                            let s = format_command(&cmd);
523                                            if let Err(_) = write_half.write_all(s.as_bytes()).await { break; }
524                                            if let Err(_) = write_half.write_all(b"\n").await { break; }
525                                            if let Err(_) = write_half.flush().await { break; }
526                                        }
527                                        None => {
528                                            // All senders dropped, exit background task
529                                            return;
530                                        }
531                                    }
532                                }
533                            }
534                        }
535                        // connection closed; try to reconnect
536                    }
537                    Err(_) => {
538                        // failed to connect after retries — wait or exit on shutdown
539                        tokio::select! {
540                            _ = shutdown_rx.recv() => return,
541                            _ = sleep(Duration::from_secs(1)) => {}
542                        }
543                    }
544                }
545            }
546        });
547
548        Client {
549            incoming: rx_in,
550            outgoing: tx_cmd,
551            shutdown_tx,
552            handle: Some(handle),
553        }
554    }
555
556    /// Asynchronously send a `Command` to the server via the background task.
557    pub async fn send(&self, cmd: Command) -> Result<(), mpsc::error::SendError<Command>> {
558        self.outgoing.send(cmd).await
559    }
560
561    /// Try to synchronously send a `Command` without awaiting.
562    pub fn try_send(&self, cmd: Command) -> Result<(), mpsc::error::TrySendError<Command>> {
563        self.outgoing.try_send(cmd)
564    }
565
566    /// Gracefully shutdown the background task and wait for it to finish.
567    pub async fn shutdown(mut self) -> Result<(), tokio::task::JoinError> {
568        let _ = self.shutdown_tx.send(()).await;
569        if let Some(handle) = self.handle.take() {
570            handle.await
571        } else {
572            Ok(())
573        }
574    }
575}
576
577impl Drop for Client {
578    fn drop(&mut self) {
579        // Best-effort: signal shutdown and abort the task so we don't leak it.
580        let _ = self.shutdown_tx.try_send(());
581        if let Some(handle) = self.handle.take() {
582            handle.abort();
583        }
584    }
585}
586
587fn needs_quote(s: &str) -> bool {
588    s.is_empty() || s.chars().any(|c| c.is_whitespace() || c == '"' || c == '\\')
589}
590
591fn escape_token(s: &str) -> String {
592    let mut out = String::new();
593    for c in s.chars() {
594        match c {
595            '\\' => out.push_str("\\\\"),
596            '"' => out.push_str("\\\"") ,
597            '\n' => out.push_str("\\n"),
598            '\r' => out.push_str("\\r"),
599            '\t' => out.push_str("\\t"),
600            other => out.push(other),
601        }
602    }
603    out
604}
605
606fn format_token(s: &str) -> String {
607    if needs_quote(s) {
608        format!("\"{}\"", escape_token(s))
609    } else {
610        s.to_string()
611    }
612}