1pub use cell_macros::{call_as, service_schema};
2
3use anyhow::bail;
4use anyhow::Context;
5use anyhow::Result;
6use std::io::{Read, Write};
7use std::os::unix::io::FromRawFd;
8use std::os::unix::net::{UnixListener, UnixStream};
9use std::time::SystemTime;
10
11pub fn run_service_with_schema<F>(service_name: &str, schema_json: &str, handler: F) -> Result<()>
14where
15 F: Fn(&str) -> Result<String> + Send + Sync + 'static,
16{
17 let listener = if let Ok(fd_str) = std::env::var("CELL_SOCKET_FD") {
18 let fd: i32 = fd_str.parse().context("CELL_SOCKET_FD must be numeric")?;
19 unsafe { UnixListener::from_raw_fd(fd) }
20 } else {
21 let path = std::env::var("CELL_SOCKET_PATH")
22 .unwrap_or_else(|_| format!("/tmp/cell/sockets/{}.sock", service_name));
23 let _ = std::fs::remove_file(&path);
24 UnixListener::bind(&path).with_context(|| format!("bind {}", path))?
25 };
26
27 listener.set_nonblocking(false).context("set_nonblocking")?;
28
29 eprintln!(
30 "{} 🚀 Service '{}' ready on {:?}",
31 humantime::format_rfc3339(SystemTime::now()),
32 service_name,
33 listener.local_addr().ok()
34 );
35
36 for stream in listener.incoming() {
37 match stream {
38 Ok(mut s) => {
39 if let Err(e) = handle_one_client(&mut s, schema_json, &handler) {
40 eprintln!(
41 "{} ❌ Handler error: {}",
42 humantime::format_rfc3339(SystemTime::now()),
43 e
44 );
45 }
46 }
47 Err(e) => eprintln!(
48 "{} ❌ Accept error: {}",
49 humantime::format_rfc3339(SystemTime::now()),
50 e
51 ),
52 }
53 }
54 Ok(())
55}
56
57fn handle_one_client(
58 stream: &mut UnixStream,
59 schema_json: &str,
60 handler: &dyn Fn(&str) -> Result<String>,
61) -> Result<()> {
62 let mut len_buf = [0u8; 4];
64 stream.read_exact(&mut len_buf)?;
65 let len = u32::from_be_bytes(len_buf) as usize;
66 if len > 16 * 1024 * 1024 {
67 bail!("message too large: {} bytes", len);
68 }
69
70 let mut msg_buf = vec![0u8; len];
71 stream.read_exact(&mut msg_buf)?;
72
73 if &msg_buf == b"__SCHEMA__" {
75 let resp = schema_json.as_bytes();
76 stream.write_all(&(resp.len() as u32).to_be_bytes())?;
77 stream.write_all(resp)?;
78 return Ok(());
79 }
80
81 let request = std::str::from_utf8(&msg_buf).context("invalid utf-8")?;
83 let response_json = handler(request)?;
84 let resp = response_json.as_bytes();
85 stream.write_all(&(resp.len() as u32).to_be_bytes())?;
86 stream.write_all(resp)?;
87 stream.flush()?;
88 Ok(())
89}
90
91#[cfg(feature = "build")]
96pub mod build {
97 use std::fs;
98 use std::io::{Read, Write};
99 use std::os::unix::net::UnixStream;
100 use std::path::Path;
101 use std::time::Duration;
102
103 pub fn fetch_and_cache_schema(service_name: &str, out_dir: &Path) -> Result<(), String> {
105 let socket_path = format!("/tmp/cell/sockets/{}.sock", service_name);
106
107 let mut stream = UnixStream::connect(&socket_path)
108 .map_err(|e| format!("Service '{}' not running: {}", service_name, e))?;
109
110 stream
111 .set_read_timeout(Some(Duration::from_secs(2)))
112 .map_err(|e| format!("Timeout error: {}", e))?;
113
114 let request = b"__SCHEMA__";
116 stream
117 .write_all(&(request.len() as u32).to_be_bytes())
118 .map_err(|e| format!("Write error: {}", e))?;
119 stream
120 .write_all(request)
121 .map_err(|e| format!("Write error: {}", e))?;
122 stream.flush().map_err(|e| format!("Flush error: {}", e))?;
123
124 let mut len_buf = [0u8; 4];
126 stream
127 .read_exact(&mut len_buf)
128 .map_err(|e| format!("Read length error: {}", e))?;
129
130 let len = u32::from_be_bytes(len_buf) as usize;
131 let mut schema_buf = vec![0u8; len];
132 stream
133 .read_exact(&mut schema_buf)
134 .map_err(|e| format!("Read schema error: {}", e))?;
135
136 let schema_hash = blake3::hash(&schema_buf).to_hex().to_string();
137
138 let schema_path = out_dir.join(format!("{}_schema.json", service_name));
140 fs::write(&schema_path, &schema_buf)
141 .map_err(|e| format!("Failed to write schema: {}", e))?;
142
143 let hash_path = out_dir.join(format!("{}_hash.txt", service_name));
145 fs::write(&hash_path, schema_hash.as_bytes())
146 .map_err(|e| format!("Failed to write hash: {}", e))?;
147
148 println!(
149 "cargo:warning=✓ Cached schema for '{}' (hash: {})",
150 service_name,
151 &schema_hash[..16]
152 );
153 println!("cargo:rerun-if-changed={}", socket_path);
154
155 Ok(())
156 }
157}