1use std::os::unix::net::UnixListener;
2use std::io::{Read, Write};
3use anyhow::Result;
4
5pub use cell_macros::{service_schema, call_as};
7
8pub fn run_service_with_schema<F>(
10 service_name: &str,
11 schema_json: &str,
12 handler: F,
13) -> Result<()>
14where
15 F: Fn(&str) -> Result<String>,
16{
17 let socket_path = std::env::var("CELL_SOCKET_PATH")
18 .unwrap_or_else(|_| format!("/tmp/cell/sockets/{}.sock", service_name));
19
20 let _ = std::fs::remove_file(&socket_path);
21
22 println!("🚀 Service '{}' starting", service_name);
23 println!(" Socket: {}", socket_path);
24
25 let listener = UnixListener::bind(&socket_path)?;
26 println!("✓ Service ready");
27
28 for stream in listener.incoming() {
29 match stream {
30 Ok(mut stream) => {
31 let mut len_buf = [0u8; 4];
33 match stream.read_exact(&mut len_buf) {
34 Ok(_) => {
35 let len = u32::from_be_bytes(len_buf) as usize;
36
37 if len > 16 * 1024 * 1024 {
39 eprintln!("❌ Message too large: {} bytes", len);
40 continue;
41 }
42
43 let mut msg_buf = vec![0u8; len];
44 if let Err(e) = stream.read_exact(&mut msg_buf) {
45 eprintln!("❌ Read error: {}", e);
46 continue;
47 }
48
49 if &msg_buf == b"__SCHEMA__" {
51 let response = schema_json.as_bytes();
52 stream.write_all(&(response.len() as u32).to_be_bytes())?;
53 stream.write_all(response)?;
54 stream.flush()?;
55 continue;
56 }
57
58 let request_json = std::str::from_utf8(&msg_buf).unwrap_or("");
60 match handler(request_json) {
61 Ok(response_json) => {
62 let response = response_json.as_bytes();
63 stream.write_all(&(response.len() as u32).to_be_bytes())?;
64 stream.write_all(response)?;
65 stream.flush()?;
66 }
67 Err(e) => {
68 eprintln!("❌ Handler error: {}", e);
69 }
70 }
71 }
72 Err(_) => {
73 break;
75 }
76 }
77 }
78 Err(e) => eprintln!("❌ Accept error: {}", e),
79 }
80 }
81
82 Ok(())
83}
84
85#[cfg(feature = "build")]
90pub mod build {
91 use std::os::unix::net::UnixStream;
92 use std::io::{Read, Write};
93 use std::path::Path;
94 use std::fs;
95 use std::time::Duration;
96
97 pub fn fetch_and_cache_schema(service_name: &str, out_dir: &Path) -> Result<(), String> {
99 let socket_path = format!("/tmp/cell/sockets/{}.sock", service_name);
100
101 let mut stream = UnixStream::connect(&socket_path)
102 .map_err(|e| format!("Service '{}' not running: {}", service_name, e))?;
103
104 stream.set_read_timeout(Some(Duration::from_secs(2)))
105 .map_err(|e| format!("Timeout error: {}", e))?;
106
107 let request = b"__SCHEMA__";
109 stream.write_all(&(request.len() as u32).to_be_bytes())
110 .map_err(|e| format!("Write error: {}", e))?;
111 stream.write_all(request)
112 .map_err(|e| format!("Write error: {}", e))?;
113 stream.flush()
114 .map_err(|e| format!("Flush error: {}", e))?;
115
116 let mut len_buf = [0u8; 4];
118 stream.read_exact(&mut len_buf)
119 .map_err(|e| format!("Read length error: {}", e))?;
120
121 let len = u32::from_be_bytes(len_buf) as usize;
122 let mut schema_buf = vec![0u8; len];
123 stream.read_exact(&mut schema_buf)
124 .map_err(|e| format!("Read schema error: {}", e))?;
125
126 let schema_hash = blake3::hash(&schema_buf).to_hex().to_string();
127
128 let schema_path = out_dir.join(format!("{}_schema.json", service_name));
130 fs::write(&schema_path, &schema_buf)
131 .map_err(|e| format!("Failed to write schema: {}", e))?;
132
133 let hash_path = out_dir.join(format!("{}_hash.txt", service_name));
135 fs::write(&hash_path, schema_hash.as_bytes())
136 .map_err(|e| format!("Failed to write hash: {}", e))?;
137
138 println!("cargo:warning=✓ Cached schema for '{}' (hash: {})", service_name, &schema_hash[..16]);
139 println!("cargo:rerun-if-changed={}", socket_path);
140
141 Ok(())
142 }
143}