1use std::collections::HashMap;
6use std::path::{Path, PathBuf};
7use std::sync::Arc;
8use tokio::fs;
9use tokio::io::{AsyncBufReadExt, AsyncReadExt, AsyncWriteExt, BufReader};
10use tokio::net::{UnixListener, UnixStream};
11use tokio::signal;
12use tokio::sync::RwLock;
13use tokio::time::{Duration, sleep};
14
15mod request;
16mod response;
17mod router;
18
19pub use request::Request;
20pub use response::Response;
21pub use router::{Method, Router};
22
23pub type HandlerFn = Arc<dyn Fn(Request) -> Response + Send + Sync>;
25
26pub type LogCallbackFn = Arc<dyn Fn(&str) + Send + Sync>;
28
29pub type PromptCallbackFn = Arc<dyn Fn(&str) + Send + Sync>;
31
32pub struct LazySock {
34 socket_path: PathBuf,
35 router: Arc<RwLock<Router>>,
36 log_callback: Option<LogCallbackFn>,
37 prompt_callback: Option<PromptCallbackFn>,
38 cleanup_on_exit: bool,
39}
40
41impl LazySock {
42 pub fn new<P: AsRef<Path>>(socket_path: P) -> Self {
44 Self {
45 socket_path: socket_path.as_ref().to_path_buf(),
46 router: Arc::new(RwLock::new(Router::new())),
47 log_callback: None,
48 prompt_callback: None,
49 cleanup_on_exit: true,
50 }
51 }
52
53 pub fn with_log_callback<F>(mut self, callback: F) -> Self
55 where
56 F: Fn(&str) + Send + Sync + 'static,
57 {
58 self.log_callback = Some(Arc::new(callback));
59 self
60 }
61
62 pub fn with_prompt_callback<F>(mut self, callback: F) -> Self
64 where
65 F: Fn(&str) + Send + Sync + 'static,
66 {
67 self.prompt_callback = Some(Arc::new(callback));
68 self
69 }
70
71 pub fn with_cleanup_on_exit(mut self, cleanup: bool) -> Self {
73 self.cleanup_on_exit = cleanup;
74 self
75 }
76
77 pub async fn route<F>(&self, method: Method, path: &str, handler: F)
79 where
80 F: Fn(Request) -> Response + Send + Sync + 'static,
81 {
82 let mut router = self.router.write().await;
83 router.add_route(method, path, Arc::new(handler));
84 }
85
86 pub async fn run(self) -> Result<(), Box<dyn std::error::Error>> {
88 if let Err(e) = self.check_and_handle_existing_socket().await {
89 return Err(e);
90 }
91
92 let listener = UnixListener::bind(&self.socket_path)?;
93 self.log(&format!("Server started on socket: {:?}", self.socket_path));
94
95 let socket_path_for_cleanup = self.socket_path.clone();
96 let cleanup_on_exit = self.cleanup_on_exit;
97
98 loop {
99 tokio::select! {
100 result = listener.accept() => {
101 match result {
102 Ok((stream, _)) => {
103 let router = Arc::clone(&self.router);
104 let log_callback = self.log_callback.clone();
105 tokio::spawn(async move {
106 if let Err(e) = handle_connection(stream, router).await {
107 if let Some(logger) = log_callback {
108 logger(&format!("Error handling connection: {}", e));
109 }
110 }
111 });
112 }
113 Err(e) => {
114 self.log(&format!("Error accepting connection: {}", e));
115 }
116 }
117 }
118 _ = signal::ctrl_c() => {
119 self.log("Server shutting down...");
120 if cleanup_on_exit {
121 let _ = fs::remove_file(&socket_path_for_cleanup).await;
122 self.log(&format!("Cleaned up socket file: {:?}", socket_path_for_cleanup));
123 }
124 break;
125 }
126 }
127 }
128
129 Ok(())
130 }
131
132 async fn check_and_handle_existing_socket(&self) -> Result<(), Box<dyn std::error::Error>> {
134 if self.socket_path.exists() {
135 self.prompt(
136 "Socket file already exists. Will override in 3 seconds... (Ctrl+C to abort now)",
137 );
138
139 tokio::select! {
140 _ = sleep(Duration::from_secs(3)) => {
141 fs::remove_file(&self.socket_path).await?;
142 self.log("Removed existing socket file.");
143 }
144 _ = signal::ctrl_c() => {
145 self.prompt("Aborted by user.");
146 return Err("User aborted".into());
147 }
148 }
149 }
150 Ok(())
151 }
152
153 fn log(&self, message: &str) {
155 if let Some(callback) = &self.log_callback {
156 callback(message);
157 }
158 }
159
160 fn prompt(&self, message: &str) {
162 if let Some(callback) = &self.prompt_callback {
163 callback(message);
164 }
165 }
166}
167
168async fn handle_connection(
170 mut stream: UnixStream,
171 router: Arc<RwLock<Router>>,
172) -> Result<(), Box<dyn std::error::Error>> {
173 let mut reader = BufReader::new(&mut stream);
174 let mut request_line = String::new();
175 reader.read_line(&mut request_line).await?;
176
177 let parts: Vec<&str> = request_line.trim().split_whitespace().collect();
178 if parts.len() < 2 {
179 return Err("Invalid request line".into());
180 }
181
182 let method = Method::from_str(parts[0]).ok_or("Unsupported HTTP method")?;
183 let path = parts[1].to_string();
184
185 let mut headers = HashMap::new();
186 let mut line = String::new();
187 loop {
188 reader.read_line(&mut line).await?;
189 if line.trim().is_empty() {
190 break;
191 }
192 if let Some((key, value)) = line.split_once(':') {
193 headers.insert(key.trim().to_string(), value.trim().to_string());
194 }
195 line.clear();
196 }
197
198 let mut body = Vec::new();
199 if let Some(content_length_str) = headers.get("Content-Length") {
200 if let Ok(content_length) = content_length_str.parse::<usize>() {
201 if content_length > 0 {
202 body.resize(content_length, 0);
203 reader.read_exact(&mut body).await?;
204 }
205 }
206 }
207
208 let request = Request::new(method.clone(), path, headers, body);
209 let router_guard = router.read().await;
210
211 let response =
212 if let Some(handler) = router_guard.find_handler(&method, request.path_without_query()) {
213 handler(request)
214 } else {
215 Response::not_found("Route not found")
216 };
217
218 let response_data = response.to_http_response();
219 stream.write_all(response_data.as_bytes()).await?;
220 stream.flush().await?;
221
222 Ok(())
223}
224
225#[macro_export]
227macro_rules! lazy_sock {
228 ($path:expr) => {
229 $crate::LazySock::new($path)
230 .with_log_callback(|msg| fancy_log::log(fancy_log::LogLevel::Info, msg))
231 .with_prompt_callback(|msg| fancy_log::log(fancy_log::LogLevel::Info, msg))
232 };
233}