cell_sdk/
lib.rs

1pub use cell_macros::{call_as, service_schema};
2
3use anyhow::bail;
4use anyhow::Context;
5use anyhow::Result;
6use std::io::{Read, Write};
7use std::os::unix::io::FromRawFd;
8use std::os::unix::net::{UnixListener, UnixStream};
9use std::time::SystemTime;
10
11/// Run a service with schema introspection.
12/// If CELL_SOCKET_FD is present we use the inherited listener, otherwise we bind ourselves.
13pub fn run_service_with_schema<F>(service_name: &str, schema_json: &str, handler: F) -> Result<()>
14where
15    F: Fn(&str) -> Result<String> + Send + Sync + 'static,
16{
17    let listener = if let Ok(fd_str) = std::env::var("CELL_SOCKET_FD") {
18        let fd: i32 = fd_str.parse().context("CELL_SOCKET_FD must be numeric")?;
19        unsafe { UnixListener::from_raw_fd(fd) }
20    } else {
21        let path = std::env::var("CELL_SOCKET_PATH")
22            .unwrap_or_else(|_| format!("/tmp/cell/sockets/{}.sock", service_name));
23        let _ = std::fs::remove_file(&path);
24        UnixListener::bind(&path).with_context(|| format!("bind {}", path))?
25    };
26
27    listener.set_nonblocking(false).context("set_nonblocking")?;
28
29    eprintln!(
30        "{} 🚀 Service '{}' ready on {:?}",
31        humantime::format_rfc3339(SystemTime::now()),
32        service_name,
33        listener.local_addr().ok()
34    );
35
36    for stream in listener.incoming() {
37        match stream {
38            Ok(mut s) => {
39                if let Err(e) = handle_one_client(&mut s, schema_json, &handler) {
40                    eprintln!(
41                        "{} ❌ Handler error: {}",
42                        humantime::format_rfc3339(SystemTime::now()),
43                        e
44                    );
45                }
46            }
47            Err(e) => eprintln!(
48                "{} ❌ Accept error: {}",
49                humantime::format_rfc3339(SystemTime::now()),
50                e
51            ),
52        }
53    }
54    Ok(())
55}
56
57fn handle_one_client(
58    stream: &mut UnixStream,
59    schema_json: &str,
60    handler: &dyn Fn(&str) -> Result<String>,
61) -> Result<()> {
62    // read 4-byte length header
63    let mut len_buf = [0u8; 4];
64    stream.read_exact(&mut len_buf)?;
65    let len = u32::from_be_bytes(len_buf) as usize;
66    if len > 16 * 1024 * 1024 {
67        bail!("message too large: {} bytes", len);
68    }
69
70    let mut msg_buf = vec![0u8; len];
71    stream.read_exact(&mut msg_buf)?;
72
73    // schema introspection
74    if &msg_buf == b"__SCHEMA__" {
75        let resp = schema_json.as_bytes();
76        stream.write_all(&(resp.len() as u32).to_be_bytes())?;
77        stream.write_all(resp)?;
78        return Ok(());
79    }
80
81    // normal request
82    let request = std::str::from_utf8(&msg_buf).context("invalid utf-8")?;
83    let response_json = handler(request)?;
84    let resp = response_json.as_bytes();
85    stream.write_all(&(resp.len() as u32).to_be_bytes())?;
86    stream.write_all(resp)?;
87    stream.flush()?;
88    Ok(())
89}
90
91// ============================================
92// Build-time helpers (for build.rs)
93// ============================================
94
95#[cfg(feature = "build")]
96pub mod build {
97    use std::fs;
98    use std::io::{Read, Write};
99    use std::os::unix::net::UnixStream;
100    use std::path::Path;
101    use std::time::Duration;
102
103    /// Fetch schema from running service and cache it
104    pub fn fetch_and_cache_schema(service_name: &str, out_dir: &Path) -> Result<(), String> {
105        let socket_path = format!("/tmp/cell/sockets/{}.sock", service_name);
106
107        let mut stream = UnixStream::connect(&socket_path)
108            .map_err(|e| format!("Service '{}' not running: {}", service_name, e))?;
109
110        stream
111            .set_read_timeout(Some(Duration::from_secs(2)))
112            .map_err(|e| format!("Timeout error: {}", e))?;
113
114        // Send length-prefixed __SCHEMA__ request
115        let request = b"__SCHEMA__";
116        stream
117            .write_all(&(request.len() as u32).to_be_bytes())
118            .map_err(|e| format!("Write error: {}", e))?;
119        stream
120            .write_all(request)
121            .map_err(|e| format!("Write error: {}", e))?;
122        stream.flush().map_err(|e| format!("Flush error: {}", e))?;
123
124        // Read length-prefixed response
125        let mut len_buf = [0u8; 4];
126        stream
127            .read_exact(&mut len_buf)
128            .map_err(|e| format!("Read length error: {}", e))?;
129
130        let len = u32::from_be_bytes(len_buf) as usize;
131        let mut schema_buf = vec![0u8; len];
132        stream
133            .read_exact(&mut schema_buf)
134            .map_err(|e| format!("Read schema error: {}", e))?;
135
136        let schema_hash = blake3::hash(&schema_buf).to_hex().to_string();
137
138        // Write schema to OUT_DIR
139        let schema_path = out_dir.join(format!("{}_schema.json", service_name));
140        fs::write(&schema_path, &schema_buf)
141            .map_err(|e| format!("Failed to write schema: {}", e))?;
142
143        // Write hash
144        let hash_path = out_dir.join(format!("{}_hash.txt", service_name));
145        fs::write(&hash_path, schema_hash.as_bytes())
146            .map_err(|e| format!("Failed to write hash: {}", e))?;
147
148        println!(
149            "cargo:warning=✓ Cached schema for '{}' (hash: {})",
150            service_name,
151            &schema_hash[..16]
152        );
153        println!("cargo:rerun-if-changed={}", socket_path);
154
155        Ok(())
156    }
157}