1use anyhow::{Context, Result};
6use oxur_repl::server::ReplServer;
7use std::time::Duration;
8use tokio::io::AsyncWriteExt;
9use tokio::net::TcpStream;
10use tokio::signal;
11
12pub async fn run(addr: &str, ack_port: Option<u16>) -> Result<()> {
17 let mut server = ReplServer::new(addr).with_shutdown_timeout(Duration::from_secs(30));
19
20 let shutdown_handle = server.shutdown_handle();
22
23 if let Some(port) = ack_port {
25 send_ack(addr, port).await?;
26 }
27
28 tokio::spawn(async move {
30 wait_for_shutdown_signal().await;
31 eprintln!("\n[INFO] Received shutdown signal, stopping server...");
32 shutdown_handle.shutdown();
33 });
34
35 server.start().await.context("Server error")?;
37
38 Ok(())
39}
40
41async fn wait_for_shutdown_signal() {
43 let _ = signal::ctrl_c().await;
45}
46
47async fn send_ack(our_addr: &str, ack_port: u16) -> Result<()> {
52 let ack_addr = format!("127.0.0.1:{}", ack_port);
53
54 eprintln!("[INFO] Sending ACK to {}", ack_addr);
55
56 match TcpStream::connect(&ack_addr).await {
57 Ok(mut stream) => {
58 let ack_msg = format!("{{:port {}}}\n", extract_port(our_addr));
60 stream.write_all(ack_msg.as_bytes()).await.context("Failed to send ACK")?;
61 stream.shutdown().await.context("Failed to close ACK connection")?;
62 eprintln!("[INFO] ACK sent successfully");
63 Ok(())
64 }
65 Err(e) => {
66 eprintln!("[WARN] Failed to send ACK to {}: {}", ack_addr, e);
67 Ok(())
69 }
70 }
71}
72
73fn extract_port(addr: &str) -> &str {
75 addr.rsplit(':').next().unwrap_or("0")
76}
77
78#[cfg(test)]
79mod tests {
80 use super::*;
81
82 #[test]
83 fn test_extract_port() {
84 assert_eq!(extract_port("127.0.0.1:5099"), "5099");
85 assert_eq!(extract_port("localhost:9000"), "9000");
86 assert_eq!(extract_port("[::1]:8080"), "8080");
87 }
88}