Skip to main content

kaish_kernel/
rpc.rs

1//! Cap'n Proto RPC server for the Kernel (核).
2//!
3//! This module implements the Kernel interface defined in `schema/kaish.capnp`,
4//! allowing remote clients to connect to a kernel via Unix sockets.
5//!
6//! # Architecture
7//!
8//! ```text
9//! Client                    Server
10//!   │                         │
11//!   │─── Unix Socket ────────▶│
12//!   │                         │
13//!   │    Cap'n Proto RPC      │
14//!   │◀───────────────────────▶│
15//!   │                         │
16//!   │         Kernel          │
17//!   │    (execute, vars, etc) │
18//! ```
19
20use std::path::{Path, PathBuf};
21use std::rc::Rc;
22use std::sync::Arc;
23
24use anyhow::{Context, Result};
25use capnp_rpc::{rpc_twoparty_capnp, twoparty, RpcSystem};
26use futures::AsyncReadExt;
27use tokio::net::UnixListener;
28use tokio::sync::Mutex;
29use tokio_util::compat::TokioAsyncReadCompatExt;
30
31use kaish_schema::{blob_sink, blob_stream, kernel};
32
33use crate::kernel::Kernel;
34use crate::paths as state_paths;
35use crate::vfs::{Filesystem, VfsRouter};
36
37/// RPC server wrapper around a Kernel.
38///
39/// Implements the Cap'n Proto Kernel interface to serve remote requests.
40pub struct KernelRpcServer {
41    kernel: Arc<Kernel>,
42}
43
44impl KernelRpcServer {
45    /// Create a new RPC server wrapping the given kernel.
46    pub fn new(kernel: Kernel) -> Self {
47        Self {
48            kernel: Arc::new(kernel),
49        }
50    }
51
52    /// Serve RPC requests on the default socket path.
53    ///
54    /// Socket is created at `$XDG_RUNTIME_DIR/kaish/<kernel_name>.sock`
55    pub async fn serve_default(&self) -> Result<()> {
56        let socket_path = state_paths::runtime_dir().join(format!("{}.sock", self.kernel.name()));
57        self.serve(&socket_path).await
58    }
59
60    /// Serve RPC requests on the given Unix socket path.
61    pub async fn serve(&self, socket_path: &Path) -> Result<()> {
62        // Ensure parent directory exists
63        if let Some(parent) = socket_path.parent() {
64            std::fs::create_dir_all(parent).ok();
65        }
66
67        // Remove existing socket if present
68        if socket_path.exists() {
69            std::fs::remove_file(socket_path)
70                .with_context(|| format!("removing old socket: {}", socket_path.display()))?;
71        }
72
73        let listener = UnixListener::bind(socket_path)
74            .with_context(|| format!("binding to socket: {}", socket_path.display()))?;
75
76        tracing::info!("Kernel RPC server listening on {}", socket_path.display());
77
78        loop {
79            let (stream, _addr) = listener.accept().await?;
80
81            // Convert tokio stream to futures-compatible stream
82            let stream = stream.compat();
83
84            let (reader, writer) = stream.split();
85
86            // Create the RPC network
87            let network = twoparty::VatNetwork::new(
88                reader,
89                writer,
90                rpc_twoparty_capnp::Side::Server,
91                Default::default(),
92            );
93
94            // Create the kernel server implementation
95            let kernel_impl = KernelImpl::new(self.kernel.clone());
96            let kernel_client: kernel::Client = capnp_rpc::new_client(kernel_impl);
97
98            // Start RPC system
99            let rpc_system = RpcSystem::new(Box::new(network), Some(kernel_client.clone().client));
100
101            // Spawn the RPC system
102            tokio::task::spawn_local(async move {
103                if let Err(e) = rpc_system.await {
104                    tracing::error!("RPC error: {}", e);
105                }
106            });
107        }
108    }
109}
110
111// ============================================================
112// Blob Stream/Sink Implementations
113// ============================================================
114
115/// Implementation of BlobStream for reading blobs.
116struct BlobStreamImpl {
117    data: Vec<u8>,
118    position: Mutex<usize>,
119}
120
121impl BlobStreamImpl {
122    fn new(data: Vec<u8>) -> Self {
123        Self {
124            data,
125            position: Mutex::new(0),
126        }
127    }
128}
129
130#[allow(refining_impl_trait)]
131impl blob_stream::Server for BlobStreamImpl {
132    fn read(
133        self: Rc<Self>,
134        params: blob_stream::ReadParams,
135        mut results: blob_stream::ReadResults,
136    ) -> capnp::capability::Promise<(), capnp::Error> {
137        let max_bytes = match params.get() {
138            Ok(p) => p.get_max_bytes() as usize,
139            Err(e) => return capnp::capability::Promise::err(e),
140        };
141
142        // Use block_in_place to safely access the mutex in sync context
143        let (chunk, done) = tokio::task::block_in_place(|| {
144            let mut pos = self.position.blocking_lock();
145            let remaining = self.data.len().saturating_sub(*pos);
146            let read_size = max_bytes.min(remaining);
147            let chunk = self.data[*pos..*pos + read_size].to_vec();
148            *pos += read_size;
149            let done = *pos >= self.data.len();
150            (chunk, done)
151        });
152
153        let mut builder = results.get();
154        builder.set_data(&chunk);
155        builder.set_done(done);
156        capnp::capability::Promise::ok(())
157    }
158
159    fn size(
160        self: Rc<Self>,
161        _params: blob_stream::SizeParams,
162        mut results: blob_stream::SizeResults,
163    ) -> capnp::capability::Promise<(), capnp::Error> {
164        let mut builder = results.get();
165        builder.set_bytes(self.data.len() as u64);
166        builder.set_known(true);
167        capnp::capability::Promise::ok(())
168    }
169
170    fn cancel(
171        self: Rc<Self>,
172        _params: blob_stream::CancelParams,
173        _results: blob_stream::CancelResults,
174    ) -> capnp::capability::Promise<(), capnp::Error> {
175        // Nothing to clean up - data is dropped when impl is dropped
176        capnp::capability::Promise::ok(())
177    }
178}
179
180/// Implementation of BlobSink for writing blobs.
181struct BlobSinkImpl {
182    vfs: Arc<VfsRouter>,
183    path: PathBuf,
184    data: Mutex<Vec<u8>>,
185    aborted: Mutex<bool>,
186}
187
188impl BlobSinkImpl {
189    fn new(vfs: Arc<VfsRouter>, path: PathBuf) -> Self {
190        Self {
191            vfs,
192            path,
193            data: Mutex::new(Vec::new()),
194            aborted: Mutex::new(false),
195        }
196    }
197}
198
199#[allow(refining_impl_trait)]
200impl blob_sink::Server for BlobSinkImpl {
201    fn write(
202        self: Rc<Self>,
203        params: blob_sink::WriteParams,
204        _results: blob_sink::WriteResults,
205    ) -> capnp::capability::Promise<(), capnp::Error> {
206        let chunk = match params.get() {
207            Ok(p) => match p.get_data() {
208                Ok(d) => d.to_vec(),
209                Err(e) => return capnp::capability::Promise::err(e),
210            },
211            Err(e) => return capnp::capability::Promise::err(e),
212        };
213
214        tokio::task::block_in_place(|| {
215            let aborted = self.aborted.blocking_lock();
216            if *aborted {
217                return;
218            }
219            drop(aborted);
220
221            let mut data = self.data.blocking_lock();
222            data.extend(chunk);
223        });
224
225        capnp::capability::Promise::ok(())
226    }
227
228    fn finish(
229        self: Rc<Self>,
230        _params: blob_sink::FinishParams,
231        mut results: blob_sink::FinishResults,
232    ) -> capnp::capability::Promise<(), capnp::Error> {
233        let vfs = self.vfs.clone();
234        let path = self.path.clone();
235
236        // Get the data and compute hash
237        let (data, hash) = tokio::task::block_in_place(|| {
238            let aborted = self.aborted.blocking_lock();
239            if *aborted {
240                return (Vec::new(), Vec::new());
241            }
242            drop(aborted);
243
244            let data = self.data.blocking_lock().clone();
245
246            // Compute SHA-256 hash
247            use std::collections::hash_map::DefaultHasher;
248            use std::hash::{Hash, Hasher};
249            let mut hasher = DefaultHasher::new();
250            data.hash(&mut hasher);
251            let hash_value = hasher.finish();
252            let hash = hash_value.to_be_bytes().to_vec();
253
254            (data, hash)
255        });
256
257        capnp::capability::Promise::from_future(async move {
258            // Ensure parent directory exists
259            let parent = path.parent().unwrap_or(Path::new("/v/blobs"));
260            if let Err(e) = vfs.mkdir(parent).await {
261                // Ignore "already exists" errors
262                if e.kind() != std::io::ErrorKind::AlreadyExists {
263                    tracing::warn!("Failed to create blob directory: {}", e);
264                }
265            }
266
267            // Write the blob
268            vfs.write(&path, &data).await.map_err(|e| {
269                capnp::Error::failed(format!("failed to write blob: {}", e))
270            })?;
271
272            results.get().set_hash(&hash);
273            Ok(())
274        })
275    }
276
277    fn abort(
278        self: Rc<Self>,
279        _params: blob_sink::AbortParams,
280        _results: blob_sink::AbortResults,
281    ) -> capnp::capability::Promise<(), capnp::Error> {
282        tokio::task::block_in_place(|| {
283            let mut aborted = self.aborted.blocking_lock();
284            *aborted = true;
285        });
286        capnp::capability::Promise::ok(())
287    }
288}
289
290/// Generate a unique blob ID.
291fn generate_blob_id() -> String {
292    use std::time::{SystemTime, UNIX_EPOCH};
293    use std::sync::atomic::{AtomicU64, Ordering};
294
295    static COUNTER: AtomicU64 = AtomicU64::new(0);
296
297    let timestamp = SystemTime::now()
298        .duration_since(UNIX_EPOCH)
299        .map(|d| d.as_nanos())
300        .unwrap_or(0);
301    let count = COUNTER.fetch_add(1, Ordering::SeqCst);
302
303    format!("{:x}-{:x}", timestamp, count)
304}
305
306/// Implementation of the Kernel Cap'n Proto interface.
307struct KernelImpl {
308    kernel: Arc<Kernel>,
309}
310
311impl KernelImpl {
312    fn new(kernel: Arc<Kernel>) -> Self {
313        Self { kernel }
314    }
315}
316
317#[allow(refining_impl_trait)]
318impl kernel::Server for KernelImpl {
319    /// Execute kaish code and return the result.
320    fn execute(
321        self: Rc<Self>,
322        params: kernel::ExecuteParams,
323        mut results: kernel::ExecuteResults,
324    ) -> capnp::capability::Promise<(), capnp::Error> {
325        let kernel = self.kernel.clone();
326
327        let input = match params.get() {
328            Ok(p) => match p.get_input() {
329                Ok(s) => match s.to_str() {
330                    Ok(s) => s.to_string(),
331                    Err(e) => return capnp::capability::Promise::err(capnp::Error::failed(format!("invalid utf8: {}", e))),
332                },
333                Err(e) => return capnp::capability::Promise::err(e),
334            },
335            Err(e) => return capnp::capability::Promise::err(e),
336        };
337
338        capnp::capability::Promise::from_future(async move {
339            let exec_result = kernel.execute(&input).await.map_err(|e| {
340                capnp::Error::failed(format!("execution error: {}", e))
341            })?;
342
343            // Build the result
344            let mut result_builder = results.get().init_result();
345            result_builder.set_code(exec_result.code as i32);
346            result_builder.set_ok(exec_result.ok());
347            result_builder.set_err(&exec_result.err);
348            result_builder.set_stdout(exec_result.out.as_bytes());
349            result_builder.set_stderr(b"");
350
351            // Set data if present
352            if let Some(data) = &exec_result.data {
353                set_value(&mut result_builder.reborrow().init_data(), data);
354            }
355
356            // Set structured output data
357            if let Some(ref output) = exec_result.output {
358                set_output_data(&mut result_builder.reborrow().init_output(), output);
359            }
360
361            Ok(())
362        })
363    }
364
365    /// Get a variable value.
366    fn get_var(
367        self: Rc<Self>,
368        params: kernel::GetVarParams,
369        mut results: kernel::GetVarResults,
370    ) -> capnp::capability::Promise<(), capnp::Error> {
371        let kernel = self.kernel.clone();
372
373        let name = match params.get() {
374            Ok(p) => match p.get_name() {
375                Ok(s) => match s.to_str() {
376                    Ok(s) => s.to_string(),
377                    Err(e) => return capnp::capability::Promise::err(capnp::Error::failed(format!("invalid utf8: {}", e))),
378                },
379                Err(e) => return capnp::capability::Promise::err(e),
380            },
381            Err(e) => return capnp::capability::Promise::err(e),
382        };
383
384        capnp::capability::Promise::from_future(async move {
385            if let Some(value) = kernel.get_var(&name).await {
386                set_value(&mut results.get().init_value(), &value);
387            }
388
389            Ok(())
390        })
391    }
392
393    /// Set a variable value.
394    fn set_var(
395        self: Rc<Self>,
396        params: kernel::SetVarParams,
397        _results: kernel::SetVarResults,
398    ) -> capnp::capability::Promise<(), capnp::Error> {
399        let kernel = self.kernel.clone();
400
401        let (name, value) = match params.get() {
402            Ok(p) => {
403                let name = match p.get_name() {
404                    Ok(s) => match s.to_str() {
405                        Ok(s) => s.to_string(),
406                        Err(e) => return capnp::capability::Promise::err(capnp::Error::failed(format!("invalid utf8: {}", e))),
407                    },
408                    Err(e) => return capnp::capability::Promise::err(e),
409                };
410                let value_reader = match p.get_value() {
411                    Ok(v) => v,
412                    Err(e) => return capnp::capability::Promise::err(e),
413                };
414                let value = match read_value(&value_reader) {
415                    Ok(v) => v,
416                    Err(e) => return capnp::capability::Promise::err(e),
417                };
418                (name, value)
419            }
420            Err(e) => return capnp::capability::Promise::err(e),
421        };
422
423        capnp::capability::Promise::from_future(async move {
424            kernel.set_var(&name, value).await;
425            Ok(())
426        })
427    }
428
429    /// List all variables.
430    fn list_vars(
431        self: Rc<Self>,
432        _params: kernel::ListVarsParams,
433        mut results: kernel::ListVarsResults,
434    ) -> capnp::capability::Promise<(), capnp::Error> {
435        let kernel = self.kernel.clone();
436
437        capnp::capability::Promise::from_future(async move {
438            let vars = kernel.list_vars().await;
439
440            let mut list = results.get().init_vars(vars.len() as u32);
441            for (i, (name, value)) in vars.into_iter().enumerate() {
442                let mut entry = list.reborrow().get(i as u32);
443                entry.set_key(&name);
444                set_value(&mut entry.init_value(), &value);
445            }
446
447            Ok(())
448        })
449    }
450
451    /// List available tools.
452    fn list_tools(
453        self: Rc<Self>,
454        _params: kernel::ListToolsParams,
455        mut results: kernel::ListToolsResults,
456    ) -> capnp::capability::Promise<(), capnp::Error> {
457        let schemas = self.kernel.tool_schemas();
458
459        let mut list = results.get().init_tools(schemas.len() as u32);
460        for (i, schema) in schemas.into_iter().enumerate() {
461            let mut entry = list.reborrow().get(i as u32);
462            entry.set_name(&schema.name);
463            entry.set_description(&schema.description);
464            // source defaults to builtin
465        }
466
467        capnp::capability::Promise::ok(())
468    }
469
470    /// Ping the kernel (health check).
471    fn ping(
472        self: Rc<Self>,
473        _params: kernel::PingParams,
474        mut results: kernel::PingResults,
475    ) -> capnp::capability::Promise<(), capnp::Error> {
476        results.get().set_pong("pong");
477        capnp::capability::Promise::ok(())
478    }
479
480    /// Shutdown the kernel.
481    ///
482    /// Currently logs the request and returns success. Actual process termination
483    /// is handled by the calling frontend (REPL, MCP server) after receiving the
484    /// successful response.
485    fn shutdown(
486        self: Rc<Self>,
487        _params: kernel::ShutdownParams,
488        _results: kernel::ShutdownResults,
489    ) -> capnp::capability::Promise<(), capnp::Error> {
490        tracing::info!("Kernel shutdown requested via RPC");
491        capnp::capability::Promise::ok(())
492    }
493
494    /// Reset kernel state.
495    fn reset(
496        self: Rc<Self>,
497        _params: kernel::ResetParams,
498        _results: kernel::ResetResults,
499    ) -> capnp::capability::Promise<(), capnp::Error> {
500        let kernel = self.kernel.clone();
501
502        capnp::capability::Promise::from_future(async move {
503            kernel.reset().await.map_err(|e| {
504                capnp::Error::failed(format!("reset error: {}", e))
505            })?;
506            Ok(())
507        })
508    }
509
510    // --- Working Directory ---
511
512    /// Get the current working directory.
513    fn get_cwd(
514        self: Rc<Self>,
515        _params: kernel::GetCwdParams,
516        mut results: kernel::GetCwdResults,
517    ) -> capnp::capability::Promise<(), capnp::Error> {
518        let kernel = self.kernel.clone();
519
520        capnp::capability::Promise::from_future(async move {
521            let cwd = kernel.cwd().await;
522            results.get().set_path(cwd.to_string_lossy());
523            Ok(())
524        })
525    }
526
527    /// Set the current working directory.
528    fn set_cwd(
529        self: Rc<Self>,
530        params: kernel::SetCwdParams,
531        mut results: kernel::SetCwdResults,
532    ) -> capnp::capability::Promise<(), capnp::Error> {
533        let kernel = self.kernel.clone();
534
535        let path = match params.get() {
536            Ok(p) => match p.get_path() {
537                Ok(s) => match s.to_str() {
538                    Ok(s) => s.to_string(),
539                    Err(e) => return capnp::capability::Promise::err(capnp::Error::failed(format!("invalid utf8: {}", e))),
540                },
541                Err(e) => return capnp::capability::Promise::err(e),
542            },
543            Err(e) => return capnp::capability::Promise::err(e),
544        };
545
546        capnp::capability::Promise::from_future(async move {
547            kernel.set_cwd(std::path::PathBuf::from(&path)).await;
548            let mut r = results.get();
549            r.set_success(true);
550            r.set_error("");
551            Ok(())
552        })
553    }
554
555    // --- Last Result ---
556
557    /// Get the last execution result ($?).
558    fn get_last_result(
559        self: Rc<Self>,
560        _params: kernel::GetLastResultParams,
561        mut results: kernel::GetLastResultResults,
562    ) -> capnp::capability::Promise<(), capnp::Error> {
563        let kernel = self.kernel.clone();
564
565        capnp::capability::Promise::from_future(async move {
566            let exec_result = kernel.last_result().await;
567
568            let mut result_builder = results.get().init_result();
569            result_builder.set_code(exec_result.code as i32);
570            result_builder.set_ok(exec_result.ok());
571            result_builder.set_err(&exec_result.err);
572            result_builder.set_stdout(exec_result.out.as_bytes());
573            result_builder.set_stderr(b"");
574
575            // Set data if present
576            if let Some(data) = &exec_result.data {
577                set_value(&mut result_builder.reborrow().init_data(), data);
578            }
579
580            // Set structured output data
581            if let Some(ref output) = exec_result.output {
582                set_output_data(&mut result_builder.reborrow().init_output(), output);
583            }
584
585            Ok(())
586        })
587    }
588
589    // --- Placeholder implementations for remaining methods ---
590
591    fn execute_streaming(
592        self: Rc<Self>,
593        _params: kernel::ExecuteStreamingParams,
594        _results: kernel::ExecuteStreamingResults,
595    ) -> capnp::capability::Promise<(), capnp::Error> {
596        capnp::capability::Promise::err(capnp::Error::unimplemented(
597            "execute_streaming not yet implemented".into(),
598        ))
599    }
600
601    fn call_tool(
602        self: Rc<Self>,
603        _params: kernel::CallToolParams,
604        _results: kernel::CallToolResults,
605    ) -> capnp::capability::Promise<(), capnp::Error> {
606        capnp::capability::Promise::err(capnp::Error::unimplemented(
607            "call_tool not yet implemented".into(),
608        ))
609    }
610
611    fn mount(
612        self: Rc<Self>,
613        _params: kernel::MountParams,
614        _results: kernel::MountResults,
615    ) -> capnp::capability::Promise<(), capnp::Error> {
616        capnp::capability::Promise::err(capnp::Error::unimplemented(
617            "mount not yet implemented".into(),
618        ))
619    }
620
621    fn unmount(
622        self: Rc<Self>,
623        _params: kernel::UnmountParams,
624        _results: kernel::UnmountResults,
625    ) -> capnp::capability::Promise<(), capnp::Error> {
626        capnp::capability::Promise::err(capnp::Error::unimplemented(
627            "unmount not yet implemented".into(),
628        ))
629    }
630
631    fn list_mounts(
632        self: Rc<Self>,
633        _params: kernel::ListMountsParams,
634        _results: kernel::ListMountsResults,
635    ) -> capnp::capability::Promise<(), capnp::Error> {
636        capnp::capability::Promise::err(capnp::Error::unimplemented(
637            "list_mounts not yet implemented".into(),
638        ))
639    }
640
641    fn register_mcp(
642        self: Rc<Self>,
643        _params: kernel::RegisterMcpParams,
644        _results: kernel::RegisterMcpResults,
645    ) -> capnp::capability::Promise<(), capnp::Error> {
646        capnp::capability::Promise::err(capnp::Error::unimplemented(
647            "register_mcp not yet implemented".into(),
648        ))
649    }
650
651    fn unregister_mcp(
652        self: Rc<Self>,
653        _params: kernel::UnregisterMcpParams,
654        _results: kernel::UnregisterMcpResults,
655    ) -> capnp::capability::Promise<(), capnp::Error> {
656        capnp::capability::Promise::err(capnp::Error::unimplemented(
657            "unregister_mcp not yet implemented".into(),
658        ))
659    }
660
661    fn list_mcp_servers(
662        self: Rc<Self>,
663        _params: kernel::ListMcpServersParams,
664        _results: kernel::ListMcpServersResults,
665    ) -> capnp::capability::Promise<(), capnp::Error> {
666        capnp::capability::Promise::err(capnp::Error::unimplemented(
667            "list_mcp_servers not yet implemented".into(),
668        ))
669    }
670
671    fn snapshot(
672        self: Rc<Self>,
673        _params: kernel::SnapshotParams,
674        _results: kernel::SnapshotResults,
675    ) -> capnp::capability::Promise<(), capnp::Error> {
676        capnp::capability::Promise::err(capnp::Error::unimplemented(
677            "snapshot not yet implemented".into(),
678        ))
679    }
680
681    fn restore(
682        self: Rc<Self>,
683        _params: kernel::RestoreParams,
684        _results: kernel::RestoreResults,
685    ) -> capnp::capability::Promise<(), capnp::Error> {
686        capnp::capability::Promise::err(capnp::Error::unimplemented(
687            "restore not yet implemented".into(),
688        ))
689    }
690
691    fn read_blob(
692        self: Rc<Self>,
693        params: kernel::ReadBlobParams,
694        mut results: kernel::ReadBlobResults,
695    ) -> capnp::capability::Promise<(), capnp::Error> {
696        let vfs = self.kernel.vfs();
697
698        let id = match params.get() {
699            Ok(p) => match p.get_id() {
700                Ok(s) => match s.to_str() {
701                    Ok(s) => s.to_string(),
702                    Err(e) => return capnp::capability::Promise::err(capnp::Error::failed(format!("invalid utf8: {}", e))),
703                },
704                Err(e) => return capnp::capability::Promise::err(e),
705            },
706            Err(e) => return capnp::capability::Promise::err(e),
707        };
708
709        capnp::capability::Promise::from_future(async move {
710            let path = PathBuf::from(format!("/v/blobs/{}", id));
711
712            // Read the blob data
713            let data = vfs.read(&path).await.map_err(|e| {
714                capnp::Error::failed(format!("failed to read blob {}: {}", id, e))
715            })?;
716
717            // Create the stream implementation
718            let stream_impl = BlobStreamImpl::new(data);
719            let stream_client: blob_stream::Client = capnp_rpc::new_client(stream_impl);
720
721            results.get().set_stream(stream_client);
722            Ok(())
723        })
724    }
725
726    fn write_blob(
727        self: Rc<Self>,
728        params: kernel::WriteBlobParams,
729        mut results: kernel::WriteBlobResults,
730    ) -> capnp::capability::Promise<(), capnp::Error> {
731        let vfs = self.kernel.vfs();
732
733        let (content_type, _size) = match params.get() {
734            Ok(p) => {
735                let ct = match p.get_content_type() {
736                    Ok(s) => match s.to_str() {
737                        Ok(s) => s.to_string(),
738                        Err(e) => return capnp::capability::Promise::err(capnp::Error::failed(format!("invalid utf8: {}", e))),
739                    },
740                    Err(e) => return capnp::capability::Promise::err(e),
741                };
742                let size = p.get_size();
743                (ct, size)
744            },
745            Err(e) => return capnp::capability::Promise::err(e),
746        };
747
748        // Generate a unique blob ID
749        let id = generate_blob_id();
750        let path = PathBuf::from(format!("/v/blobs/{}", id));
751
752        // Store content type as metadata (could be extended later)
753        tracing::debug!("Creating blob {} with content type {}", id, content_type);
754
755        // Create the sink implementation
756        let sink_impl = BlobSinkImpl::new(vfs, path);
757        let sink_client: blob_sink::Client = capnp_rpc::new_client(sink_impl);
758
759        let mut builder = results.get();
760        builder.set_id(&id);
761        builder.set_stream(sink_client);
762
763        capnp::capability::Promise::ok(())
764    }
765
766    fn delete_blob(
767        self: Rc<Self>,
768        params: kernel::DeleteBlobParams,
769        mut results: kernel::DeleteBlobResults,
770    ) -> capnp::capability::Promise<(), capnp::Error> {
771        let vfs = self.kernel.vfs();
772
773        let id = match params.get() {
774            Ok(p) => match p.get_id() {
775                Ok(s) => match s.to_str() {
776                    Ok(s) => s.to_string(),
777                    Err(e) => return capnp::capability::Promise::err(capnp::Error::failed(format!("invalid utf8: {}", e))),
778                },
779                Err(e) => return capnp::capability::Promise::err(e),
780            },
781            Err(e) => return capnp::capability::Promise::err(e),
782        };
783
784        capnp::capability::Promise::from_future(async move {
785            let path = PathBuf::from(format!("/v/blobs/{}", id));
786
787            // Delete the blob
788            let success = match vfs.remove(&path).await {
789                Ok(()) => true,
790                Err(e) => {
791                    tracing::warn!("Failed to delete blob {}: {}", id, e);
792                    false
793                }
794            };
795
796            results.get().set_success(success);
797            Ok(())
798        })
799    }
800}
801
802// ============================================================
803// Value Conversion Helpers
804// ============================================================
805
806use kaish_schema::value;
807use kaish_schema::{output_data, output_node};
808use crate::ast::Value;
809use crate::interpreter::{EntryType, OutputData, OutputNode};
810
811/// Convert a kaish Value to a Cap'n Proto Value.
812fn set_value(builder: &mut value::Builder<'_>, value: &Value) {
813    match value {
814        Value::Null => builder.set_null(()),
815        Value::Bool(b) => builder.set_bool(*b),
816        Value::Int(i) => builder.set_int(*i),
817        Value::Float(f) => builder.set_float(*f),
818        Value::String(s) => builder.set_string(s),
819        Value::Json(json) => {
820            // Serialize Json values using the Cap'n Proto array/object types
821            set_json_value(builder.reborrow(), json);
822        }
823        Value::Blob(blob) => {
824            let mut blob_builder = builder.reborrow().init_blob();
825            blob_builder.set_id(&blob.id);
826            blob_builder.set_size(blob.size);
827            blob_builder.set_content_type(&blob.content_type);
828            if let Some(hash) = &blob.hash {
829                blob_builder.set_hash(hash);
830            }
831        }
832    }
833}
834
835/// Helper to serialize serde_json::Value to Cap'n Proto.
836fn set_json_value(mut builder: value::Builder<'_>, json: &serde_json::Value) {
837    match json {
838        serde_json::Value::Null => builder.set_null(()),
839        serde_json::Value::Bool(b) => builder.set_bool(*b),
840        serde_json::Value::Number(n) => {
841            if let Some(i) = n.as_i64() {
842                builder.set_int(i);
843            } else if let Some(f) = n.as_f64() {
844                builder.set_float(f);
845            } else {
846                builder.set_string(n.to_string());
847            }
848        }
849        serde_json::Value::String(s) => builder.set_string(s),
850        serde_json::Value::Array(arr) => {
851            let mut array_builder = builder.init_array(arr.len() as u32);
852            for (i, item) in arr.iter().enumerate() {
853                set_json_value(array_builder.reborrow().get(i as u32), item);
854            }
855        }
856        serde_json::Value::Object(obj) => {
857            let mut object_builder = builder.init_object(obj.len() as u32);
858            for (i, (key, val)) in obj.iter().enumerate() {
859                let mut entry = object_builder.reborrow().get(i as u32);
860                entry.set_key(key);
861                set_json_value(entry.init_value(), val);
862            }
863        }
864    }
865}
866
867/// Convert a kaish OutputData to Cap'n Proto OutputData.
868fn set_output_data(builder: &mut output_data::Builder<'_>, output: &OutputData) {
869    // Set headers if present
870    if let Some(ref headers) = output.headers {
871        let mut headers_builder = builder.reborrow().init_headers(headers.len() as u32);
872        for (i, header) in headers.iter().enumerate() {
873            headers_builder.set(i as u32, header);
874        }
875    }
876
877    // Set root nodes
878    let mut root_builder = builder.reborrow().init_root(output.root.len() as u32);
879    for (i, node) in output.root.iter().enumerate() {
880        let mut node_builder = root_builder.reborrow().get(i as u32);
881        set_output_node(&mut node_builder, node);
882    }
883}
884
885/// Convert a kaish OutputNode to Cap'n Proto OutputNode.
886fn set_output_node(builder: &mut output_node::Builder<'_>, node: &OutputNode) {
887    builder.set_name(&node.name);
888
889    // Set entry type
890    use kaish_schema::kaish_capnp::EntryType as SchemaEntryType;
891    let entry_type = match node.entry_type {
892        EntryType::Text => SchemaEntryType::Text,
893        EntryType::File => SchemaEntryType::File,
894        EntryType::Directory => SchemaEntryType::Directory,
895        EntryType::Executable => SchemaEntryType::Executable,
896        EntryType::Symlink => SchemaEntryType::Symlink,
897    };
898    builder.set_entry_type(entry_type);
899
900    // Set text if present
901    if let Some(ref text) = node.text {
902        builder.set_text(text);
903    }
904
905    // Set cells
906    if !node.cells.is_empty() {
907        let mut cells_builder = builder.reborrow().init_cells(node.cells.len() as u32);
908        for (i, cell) in node.cells.iter().enumerate() {
909            cells_builder.set(i as u32, cell);
910        }
911    }
912
913    // Set children recursively
914    if !node.children.is_empty() {
915        let mut children_builder = builder.reborrow().init_children(node.children.len() as u32);
916        for (i, child) in node.children.iter().enumerate() {
917            let mut child_builder = children_builder.reborrow().get(i as u32);
918            set_output_node(&mut child_builder, child);
919        }
920    }
921}
922
923/// Read a kaish Value from a Cap'n Proto Value.
924///
925/// Arrays and objects are serialized as JSON strings.
926fn read_value(reader: &value::Reader<'_>) -> Result<Value, capnp::Error> {
927    use value::Which;
928    match reader.which()? {
929        Which::Null(()) => Ok(Value::Null),
930        Which::Bool(b) => Ok(Value::Bool(b)),
931        Which::Int(i) => Ok(Value::Int(i)),
932        Which::Float(f) => Ok(Value::Float(f)),
933        Which::String(s) => {
934            let text = s?;
935            let string = text.to_str().map_err(|e| capnp::Error::failed(format!("invalid utf8: {}", e)))?;
936            Ok(Value::String(string.to_string()))
937        }
938        Which::Array(arr) => {
939            // Convert array to JSON string
940            let arr = arr?;
941            let items: Result<Vec<_>, _> = arr.iter().map(|v| read_value_to_json(&v)).collect();
942            let json_array = serde_json::Value::Array(items?);
943            Ok(Value::String(json_array.to_string()))
944        }
945        Which::Object(obj) => {
946            // Convert object to JSON string
947            let obj = obj?;
948            let mut map = serde_json::Map::new();
949            for kv in obj.iter() {
950                let key_text = kv.get_key()?;
951                let key = key_text.to_str().map_err(|e| capnp::Error::failed(format!("invalid utf8: {}", e)))?.to_string();
952                let val = read_value_to_json(&kv.get_value()?)?;
953                map.insert(key, val);
954            }
955            Ok(Value::String(serde_json::Value::Object(map).to_string()))
956        }
957        Which::Blob(blob) => {
958            // Convert blob reference to JSON string representation
959            let blob = blob?;
960            let id = blob.get_id()?.to_str().map_err(|e| capnp::Error::failed(format!("invalid utf8: {}", e)))?;
961            let size = blob.get_size();
962            let content_type = blob.get_content_type()?.to_str().map_err(|e| capnp::Error::failed(format!("invalid utf8: {}", e)))?;
963            let hash = blob.get_hash()?;
964
965            let mut map = serde_json::Map::new();
966            map.insert("_type".to_string(), serde_json::Value::String("blob".to_string()));
967            map.insert("id".to_string(), serde_json::Value::String(id.to_string()));
968            map.insert("size".to_string(), serde_json::Value::Number(size.into()));
969            map.insert("contentType".to_string(), serde_json::Value::String(content_type.to_string()));
970            if !hash.is_empty() {
971                let hash_hex: String = hash.iter().map(|b| format!("{:02x}", b)).collect();
972                map.insert("hash".to_string(), serde_json::Value::String(hash_hex));
973            }
974            Ok(Value::String(serde_json::Value::Object(map).to_string()))
975        }
976    }
977}
978
979/// Helper to convert Cap'n Proto Value to serde_json::Value
980fn read_value_to_json(reader: &value::Reader<'_>) -> Result<serde_json::Value, capnp::Error> {
981    use value::Which;
982    match reader.which()? {
983        Which::Null(()) => Ok(serde_json::Value::Null),
984        Which::Bool(b) => Ok(serde_json::Value::Bool(b)),
985        Which::Int(i) => Ok(serde_json::Value::Number(i.into())),
986        Which::Float(f) => Ok(serde_json::Number::from_f64(f)
987            .map(serde_json::Value::Number)
988            .unwrap_or(serde_json::Value::Null)),
989        Which::String(s) => {
990            let text = s?;
991            let string = text.to_str().map_err(|e| capnp::Error::failed(format!("invalid utf8: {}", e)))?;
992            Ok(serde_json::Value::String(string.to_string()))
993        }
994        Which::Array(arr) => {
995            let arr = arr?;
996            let items: Result<Vec<_>, _> = arr.iter().map(|v| read_value_to_json(&v)).collect();
997            Ok(serde_json::Value::Array(items?))
998        }
999        Which::Object(obj) => {
1000            let obj = obj?;
1001            let mut map = serde_json::Map::new();
1002            for kv in obj.iter() {
1003                let key_text = kv.get_key()?;
1004                let key = key_text.to_str().map_err(|e| capnp::Error::failed(format!("invalid utf8: {}", e)))?.to_string();
1005                let val = read_value_to_json(&kv.get_value()?)?;
1006                map.insert(key, val);
1007            }
1008            Ok(serde_json::Value::Object(map))
1009        }
1010        Which::Blob(blob) => {
1011            // Convert blob reference to JSON object
1012            let blob = blob?;
1013            let id = blob.get_id()?.to_str().map_err(|e| capnp::Error::failed(format!("invalid utf8: {}", e)))?;
1014            let size = blob.get_size();
1015            let content_type = blob.get_content_type()?.to_str().map_err(|e| capnp::Error::failed(format!("invalid utf8: {}", e)))?;
1016            let hash = blob.get_hash()?;
1017
1018            let mut map = serde_json::Map::new();
1019            map.insert("_type".to_string(), serde_json::Value::String("blob".to_string()));
1020            map.insert("id".to_string(), serde_json::Value::String(id.to_string()));
1021            map.insert("size".to_string(), serde_json::Value::Number(size.into()));
1022            map.insert("contentType".to_string(), serde_json::Value::String(content_type.to_string()));
1023            if !hash.is_empty() {
1024                let hash_hex: String = hash.iter().map(|b| format!("{:02x}", b)).collect();
1025                map.insert("hash".to_string(), serde_json::Value::String(hash_hex));
1026            }
1027            Ok(serde_json::Value::Object(map))
1028        }
1029    }
1030}