Skip to main content

kaish_kernel/
rpc.rs

1//! Cap'n Proto RPC server for the Kernel (核).
2//!
3//! This module implements the Kernel interface defined in `schema/kaish.capnp`,
4//! allowing remote clients to connect to a kernel via Unix sockets.
5//!
6//! # Architecture
7//!
8//! ```text
9//! Client                    Server
10//!   │                         │
11//!   │─── Unix Socket ────────▶│
12//!   │                         │
13//!   │    Cap'n Proto RPC      │
14//!   │◀───────────────────────▶│
15//!   │                         │
16//!   │         Kernel          │
17//!   │    (execute, vars, etc) │
18//! ```
19
20use std::path::{Path, PathBuf};
21use std::sync::Arc;
22
23use anyhow::{Context, Result};
24use capnp_rpc::{rpc_twoparty_capnp, twoparty, RpcSystem};
25use futures::AsyncReadExt;
26use tokio::net::UnixListener;
27use tokio::sync::Mutex;
28use tokio_util::compat::TokioAsyncReadCompatExt;
29
30use kaish_schema::{blob_sink, blob_stream, kernel};
31
32use crate::kernel::Kernel;
33use crate::paths as state_paths;
34use crate::vfs::{Filesystem, VfsRouter};
35
36/// RPC server wrapper around a Kernel.
37///
38/// Implements the Cap'n Proto Kernel interface to serve remote requests.
39pub struct KernelRpcServer {
40    kernel: Arc<Kernel>,
41}
42
43impl KernelRpcServer {
44    /// Create a new RPC server wrapping the given kernel.
45    pub fn new(kernel: Kernel) -> Self {
46        Self {
47            kernel: Arc::new(kernel),
48        }
49    }
50
51    /// Serve RPC requests on the default socket path.
52    ///
53    /// Socket is created at `$XDG_RUNTIME_DIR/kaish/<kernel_name>.sock`
54    pub async fn serve_default(&self) -> Result<()> {
55        let socket_path = state_paths::runtime_dir().join(format!("{}.sock", self.kernel.name()));
56        self.serve(&socket_path).await
57    }
58
59    /// Serve RPC requests on the given Unix socket path.
60    pub async fn serve(&self, socket_path: &Path) -> Result<()> {
61        // Ensure parent directory exists
62        if let Some(parent) = socket_path.parent() {
63            std::fs::create_dir_all(parent).ok();
64        }
65
66        // Remove existing socket if present
67        if socket_path.exists() {
68            std::fs::remove_file(socket_path)
69                .with_context(|| format!("removing old socket: {}", socket_path.display()))?;
70        }
71
72        let listener = UnixListener::bind(socket_path)
73            .with_context(|| format!("binding to socket: {}", socket_path.display()))?;
74
75        tracing::info!("Kernel RPC server listening on {}", socket_path.display());
76
77        loop {
78            let (stream, _addr) = listener.accept().await?;
79
80            // Convert tokio stream to futures-compatible stream
81            let stream = stream.compat();
82
83            let (reader, writer) = stream.split();
84
85            // Create the RPC network
86            let network = twoparty::VatNetwork::new(
87                reader,
88                writer,
89                rpc_twoparty_capnp::Side::Server,
90                Default::default(),
91            );
92
93            // Create the kernel server implementation
94            let kernel_impl = KernelImpl::new(self.kernel.clone());
95            let kernel_client: kernel::Client = capnp_rpc::new_client(kernel_impl);
96
97            // Start RPC system
98            let rpc_system = RpcSystem::new(Box::new(network), Some(kernel_client.clone().client));
99
100            // Spawn the RPC system
101            tokio::task::spawn_local(async move {
102                if let Err(e) = rpc_system.await {
103                    tracing::error!("RPC error: {}", e);
104                }
105            });
106        }
107    }
108}
109
110// ============================================================
111// Blob Stream/Sink Implementations
112// ============================================================
113
114/// Implementation of BlobStream for reading blobs.
115struct BlobStreamImpl {
116    data: Vec<u8>,
117    position: Mutex<usize>,
118}
119
120impl BlobStreamImpl {
121    fn new(data: Vec<u8>) -> Self {
122        Self {
123            data,
124            position: Mutex::new(0),
125        }
126    }
127}
128
129impl blob_stream::Server for BlobStreamImpl {
130    fn read(
131        &mut self,
132        params: blob_stream::ReadParams,
133        mut results: blob_stream::ReadResults,
134    ) -> capnp::capability::Promise<(), capnp::Error> {
135        let max_bytes = match params.get() {
136            Ok(p) => p.get_max_bytes() as usize,
137            Err(e) => return capnp::capability::Promise::err(e),
138        };
139
140        // Use block_in_place to safely access the mutex in sync context
141        let (chunk, done) = tokio::task::block_in_place(|| {
142            let mut pos = self.position.blocking_lock();
143            let remaining = self.data.len().saturating_sub(*pos);
144            let read_size = max_bytes.min(remaining);
145            let chunk = self.data[*pos..*pos + read_size].to_vec();
146            *pos += read_size;
147            let done = *pos >= self.data.len();
148            (chunk, done)
149        });
150
151        let mut builder = results.get();
152        builder.set_data(&chunk);
153        builder.set_done(done);
154        capnp::capability::Promise::ok(())
155    }
156
157    fn size(
158        &mut self,
159        _params: blob_stream::SizeParams,
160        mut results: blob_stream::SizeResults,
161    ) -> capnp::capability::Promise<(), capnp::Error> {
162        let mut builder = results.get();
163        builder.set_bytes(self.data.len() as u64);
164        builder.set_known(true);
165        capnp::capability::Promise::ok(())
166    }
167
168    fn cancel(
169        &mut self,
170        _params: blob_stream::CancelParams,
171        _results: blob_stream::CancelResults,
172    ) -> capnp::capability::Promise<(), capnp::Error> {
173        // Nothing to clean up - data is dropped when impl is dropped
174        capnp::capability::Promise::ok(())
175    }
176}
177
178/// Implementation of BlobSink for writing blobs.
179struct BlobSinkImpl {
180    vfs: Arc<VfsRouter>,
181    path: PathBuf,
182    data: Mutex<Vec<u8>>,
183    aborted: Mutex<bool>,
184}
185
186impl BlobSinkImpl {
187    fn new(vfs: Arc<VfsRouter>, path: PathBuf) -> Self {
188        Self {
189            vfs,
190            path,
191            data: Mutex::new(Vec::new()),
192            aborted: Mutex::new(false),
193        }
194    }
195}
196
197impl blob_sink::Server for BlobSinkImpl {
198    fn write(
199        &mut self,
200        params: blob_sink::WriteParams,
201        _results: blob_sink::WriteResults,
202    ) -> capnp::capability::Promise<(), capnp::Error> {
203        let chunk = match params.get() {
204            Ok(p) => match p.get_data() {
205                Ok(d) => d.to_vec(),
206                Err(e) => return capnp::capability::Promise::err(e),
207            },
208            Err(e) => return capnp::capability::Promise::err(e),
209        };
210
211        tokio::task::block_in_place(|| {
212            let aborted = self.aborted.blocking_lock();
213            if *aborted {
214                return;
215            }
216            drop(aborted);
217
218            let mut data = self.data.blocking_lock();
219            data.extend(chunk);
220        });
221
222        capnp::capability::Promise::ok(())
223    }
224
225    fn finish(
226        &mut self,
227        _params: blob_sink::FinishParams,
228        mut results: blob_sink::FinishResults,
229    ) -> capnp::capability::Promise<(), capnp::Error> {
230        let vfs = self.vfs.clone();
231        let path = self.path.clone();
232
233        // Get the data and compute hash
234        let (data, hash) = tokio::task::block_in_place(|| {
235            let aborted = self.aborted.blocking_lock();
236            if *aborted {
237                return (Vec::new(), Vec::new());
238            }
239            drop(aborted);
240
241            let data = self.data.blocking_lock().clone();
242
243            // Compute SHA-256 hash
244            use std::collections::hash_map::DefaultHasher;
245            use std::hash::{Hash, Hasher};
246            let mut hasher = DefaultHasher::new();
247            data.hash(&mut hasher);
248            let hash_value = hasher.finish();
249            let hash = hash_value.to_be_bytes().to_vec();
250
251            (data, hash)
252        });
253
254        capnp::capability::Promise::from_future(async move {
255            // Ensure parent directory exists
256            let parent = path.parent().unwrap_or(Path::new("/v/blobs"));
257            if let Err(e) = vfs.mkdir(parent).await {
258                // Ignore "already exists" errors
259                if e.kind() != std::io::ErrorKind::AlreadyExists {
260                    tracing::warn!("Failed to create blob directory: {}", e);
261                }
262            }
263
264            // Write the blob
265            vfs.write(&path, &data).await.map_err(|e| {
266                capnp::Error::failed(format!("failed to write blob: {}", e))
267            })?;
268
269            results.get().set_hash(&hash);
270            Ok(())
271        })
272    }
273
274    fn abort(
275        &mut self,
276        _params: blob_sink::AbortParams,
277        _results: blob_sink::AbortResults,
278    ) -> capnp::capability::Promise<(), capnp::Error> {
279        tokio::task::block_in_place(|| {
280            let mut aborted = self.aborted.blocking_lock();
281            *aborted = true;
282        });
283        capnp::capability::Promise::ok(())
284    }
285}
286
287/// Generate a unique blob ID.
288fn generate_blob_id() -> String {
289    use std::time::{SystemTime, UNIX_EPOCH};
290    use std::sync::atomic::{AtomicU64, Ordering};
291
292    static COUNTER: AtomicU64 = AtomicU64::new(0);
293
294    let timestamp = SystemTime::now()
295        .duration_since(UNIX_EPOCH)
296        .map(|d| d.as_nanos())
297        .unwrap_or(0);
298    let count = COUNTER.fetch_add(1, Ordering::SeqCst);
299
300    format!("{:x}-{:x}", timestamp, count)
301}
302
303/// Implementation of the Kernel Cap'n Proto interface.
304struct KernelImpl {
305    kernel: Arc<Kernel>,
306}
307
308impl KernelImpl {
309    fn new(kernel: Arc<Kernel>) -> Self {
310        Self { kernel }
311    }
312}
313
314impl kernel::Server for KernelImpl {
315    /// Execute kaish code and return the result.
316    fn execute(
317        &mut self,
318        params: kernel::ExecuteParams,
319        mut results: kernel::ExecuteResults,
320    ) -> capnp::capability::Promise<(), capnp::Error> {
321        let kernel = self.kernel.clone();
322
323        let input = match params.get() {
324            Ok(p) => match p.get_input() {
325                Ok(s) => match s.to_str() {
326                    Ok(s) => s.to_string(),
327                    Err(e) => return capnp::capability::Promise::err(capnp::Error::failed(format!("invalid utf8: {}", e))),
328                },
329                Err(e) => return capnp::capability::Promise::err(e),
330            },
331            Err(e) => return capnp::capability::Promise::err(e),
332        };
333
334        capnp::capability::Promise::from_future(async move {
335            let exec_result = kernel.execute(&input).await.map_err(|e| {
336                capnp::Error::failed(format!("execution error: {}", e))
337            })?;
338
339            // Build the result
340            let mut result_builder = results.get().init_result();
341            result_builder.set_code(exec_result.code as i32);
342            result_builder.set_ok(exec_result.ok());
343            result_builder.set_err(&exec_result.err);
344            result_builder.set_stdout(exec_result.out.as_bytes());
345            result_builder.set_stderr(b"");
346
347            // Set data if present
348            if let Some(data) = &exec_result.data {
349                set_value(&mut result_builder.reborrow().init_data(), data);
350            }
351
352            // Set structured output data
353            if let Some(ref output) = exec_result.output {
354                set_output_data(&mut result_builder.reborrow().init_output(), output);
355            }
356
357            Ok(())
358        })
359    }
360
361    /// Get a variable value.
362    fn get_var(
363        &mut self,
364        params: kernel::GetVarParams,
365        mut results: kernel::GetVarResults,
366    ) -> capnp::capability::Promise<(), capnp::Error> {
367        let kernel = self.kernel.clone();
368
369        let name = match params.get() {
370            Ok(p) => match p.get_name() {
371                Ok(s) => match s.to_str() {
372                    Ok(s) => s.to_string(),
373                    Err(e) => return capnp::capability::Promise::err(capnp::Error::failed(format!("invalid utf8: {}", e))),
374                },
375                Err(e) => return capnp::capability::Promise::err(e),
376            },
377            Err(e) => return capnp::capability::Promise::err(e),
378        };
379
380        capnp::capability::Promise::from_future(async move {
381            if let Some(value) = kernel.get_var(&name).await {
382                set_value(&mut results.get().init_value(), &value);
383            }
384
385            Ok(())
386        })
387    }
388
389    /// Set a variable value.
390    fn set_var(
391        &mut self,
392        params: kernel::SetVarParams,
393        _results: kernel::SetVarResults,
394    ) -> capnp::capability::Promise<(), capnp::Error> {
395        let kernel = self.kernel.clone();
396
397        let (name, value) = match params.get() {
398            Ok(p) => {
399                let name = match p.get_name() {
400                    Ok(s) => match s.to_str() {
401                        Ok(s) => s.to_string(),
402                        Err(e) => return capnp::capability::Promise::err(capnp::Error::failed(format!("invalid utf8: {}", e))),
403                    },
404                    Err(e) => return capnp::capability::Promise::err(e),
405                };
406                let value_reader = match p.get_value() {
407                    Ok(v) => v,
408                    Err(e) => return capnp::capability::Promise::err(e),
409                };
410                let value = match read_value(&value_reader) {
411                    Ok(v) => v,
412                    Err(e) => return capnp::capability::Promise::err(e),
413                };
414                (name, value)
415            }
416            Err(e) => return capnp::capability::Promise::err(e),
417        };
418
419        capnp::capability::Promise::from_future(async move {
420            kernel.set_var(&name, value).await;
421            Ok(())
422        })
423    }
424
425    /// List all variables.
426    fn list_vars(
427        &mut self,
428        _params: kernel::ListVarsParams,
429        mut results: kernel::ListVarsResults,
430    ) -> capnp::capability::Promise<(), capnp::Error> {
431        let kernel = self.kernel.clone();
432
433        capnp::capability::Promise::from_future(async move {
434            let vars = kernel.list_vars().await;
435
436            let mut list = results.get().init_vars(vars.len() as u32);
437            for (i, (name, value)) in vars.into_iter().enumerate() {
438                let mut entry = list.reborrow().get(i as u32);
439                entry.set_key(&name);
440                set_value(&mut entry.init_value(), &value);
441            }
442
443            Ok(())
444        })
445    }
446
447    /// List available tools.
448    fn list_tools(
449        &mut self,
450        _params: kernel::ListToolsParams,
451        mut results: kernel::ListToolsResults,
452    ) -> capnp::capability::Promise<(), capnp::Error> {
453        let schemas = self.kernel.tool_schemas();
454
455        let mut list = results.get().init_tools(schemas.len() as u32);
456        for (i, schema) in schemas.into_iter().enumerate() {
457            let mut entry = list.reborrow().get(i as u32);
458            entry.set_name(&schema.name);
459            entry.set_description(&schema.description);
460            // source defaults to builtin
461        }
462
463        capnp::capability::Promise::ok(())
464    }
465
466    /// Ping the kernel (health check).
467    fn ping(
468        &mut self,
469        _params: kernel::PingParams,
470        mut results: kernel::PingResults,
471    ) -> capnp::capability::Promise<(), capnp::Error> {
472        results.get().set_pong("pong");
473        capnp::capability::Promise::ok(())
474    }
475
476    /// Shutdown the kernel.
477    ///
478    /// Currently logs the request and returns success. Actual process termination
479    /// is handled by the calling frontend (REPL, MCP server) after receiving the
480    /// successful response.
481    fn shutdown(
482        &mut self,
483        _params: kernel::ShutdownParams,
484        _results: kernel::ShutdownResults,
485    ) -> capnp::capability::Promise<(), capnp::Error> {
486        tracing::info!("Kernel shutdown requested via RPC");
487        capnp::capability::Promise::ok(())
488    }
489
490    /// Reset kernel state.
491    fn reset(
492        &mut self,
493        _params: kernel::ResetParams,
494        _results: kernel::ResetResults,
495    ) -> capnp::capability::Promise<(), capnp::Error> {
496        let kernel = self.kernel.clone();
497
498        capnp::capability::Promise::from_future(async move {
499            kernel.reset().await.map_err(|e| {
500                capnp::Error::failed(format!("reset error: {}", e))
501            })?;
502            Ok(())
503        })
504    }
505
506    // --- Working Directory ---
507
508    /// Get the current working directory.
509    fn get_cwd(
510        &mut self,
511        _params: kernel::GetCwdParams,
512        mut results: kernel::GetCwdResults,
513    ) -> capnp::capability::Promise<(), capnp::Error> {
514        let kernel = self.kernel.clone();
515
516        capnp::capability::Promise::from_future(async move {
517            let cwd = kernel.cwd().await;
518            results.get().set_path(cwd.to_string_lossy());
519            Ok(())
520        })
521    }
522
523    /// Set the current working directory.
524    fn set_cwd(
525        &mut self,
526        params: kernel::SetCwdParams,
527        mut results: kernel::SetCwdResults,
528    ) -> capnp::capability::Promise<(), capnp::Error> {
529        let kernel = self.kernel.clone();
530
531        let path = match params.get() {
532            Ok(p) => match p.get_path() {
533                Ok(s) => match s.to_str() {
534                    Ok(s) => s.to_string(),
535                    Err(e) => return capnp::capability::Promise::err(capnp::Error::failed(format!("invalid utf8: {}", e))),
536                },
537                Err(e) => return capnp::capability::Promise::err(e),
538            },
539            Err(e) => return capnp::capability::Promise::err(e),
540        };
541
542        capnp::capability::Promise::from_future(async move {
543            kernel.set_cwd(std::path::PathBuf::from(&path)).await;
544            let mut r = results.get();
545            r.set_success(true);
546            r.set_error("");
547            Ok(())
548        })
549    }
550
551    // --- Last Result ---
552
553    /// Get the last execution result ($?).
554    fn get_last_result(
555        &mut self,
556        _params: kernel::GetLastResultParams,
557        mut results: kernel::GetLastResultResults,
558    ) -> capnp::capability::Promise<(), capnp::Error> {
559        let kernel = self.kernel.clone();
560
561        capnp::capability::Promise::from_future(async move {
562            let exec_result = kernel.last_result().await;
563
564            let mut result_builder = results.get().init_result();
565            result_builder.set_code(exec_result.code as i32);
566            result_builder.set_ok(exec_result.ok());
567            result_builder.set_err(&exec_result.err);
568            result_builder.set_stdout(exec_result.out.as_bytes());
569            result_builder.set_stderr(b"");
570
571            // Set data if present
572            if let Some(data) = &exec_result.data {
573                set_value(&mut result_builder.reborrow().init_data(), data);
574            }
575
576            // Set structured output data
577            if let Some(ref output) = exec_result.output {
578                set_output_data(&mut result_builder.reborrow().init_output(), output);
579            }
580
581            Ok(())
582        })
583    }
584
585    // --- Placeholder implementations for remaining methods ---
586
587    fn execute_streaming(
588        &mut self,
589        _params: kernel::ExecuteStreamingParams,
590        _results: kernel::ExecuteStreamingResults,
591    ) -> capnp::capability::Promise<(), capnp::Error> {
592        capnp::capability::Promise::err(capnp::Error::unimplemented(
593            "execute_streaming not yet implemented".into(),
594        ))
595    }
596
597    fn call_tool(
598        &mut self,
599        _params: kernel::CallToolParams,
600        _results: kernel::CallToolResults,
601    ) -> capnp::capability::Promise<(), capnp::Error> {
602        capnp::capability::Promise::err(capnp::Error::unimplemented(
603            "call_tool not yet implemented".into(),
604        ))
605    }
606
607    fn mount(
608        &mut self,
609        _params: kernel::MountParams,
610        _results: kernel::MountResults,
611    ) -> capnp::capability::Promise<(), capnp::Error> {
612        capnp::capability::Promise::err(capnp::Error::unimplemented(
613            "mount not yet implemented".into(),
614        ))
615    }
616
617    fn unmount(
618        &mut self,
619        _params: kernel::UnmountParams,
620        _results: kernel::UnmountResults,
621    ) -> capnp::capability::Promise<(), capnp::Error> {
622        capnp::capability::Promise::err(capnp::Error::unimplemented(
623            "unmount not yet implemented".into(),
624        ))
625    }
626
627    fn list_mounts(
628        &mut self,
629        _params: kernel::ListMountsParams,
630        _results: kernel::ListMountsResults,
631    ) -> capnp::capability::Promise<(), capnp::Error> {
632        capnp::capability::Promise::err(capnp::Error::unimplemented(
633            "list_mounts not yet implemented".into(),
634        ))
635    }
636
637    fn register_mcp(
638        &mut self,
639        _params: kernel::RegisterMcpParams,
640        _results: kernel::RegisterMcpResults,
641    ) -> capnp::capability::Promise<(), capnp::Error> {
642        capnp::capability::Promise::err(capnp::Error::unimplemented(
643            "register_mcp not yet implemented".into(),
644        ))
645    }
646
647    fn unregister_mcp(
648        &mut self,
649        _params: kernel::UnregisterMcpParams,
650        _results: kernel::UnregisterMcpResults,
651    ) -> capnp::capability::Promise<(), capnp::Error> {
652        capnp::capability::Promise::err(capnp::Error::unimplemented(
653            "unregister_mcp not yet implemented".into(),
654        ))
655    }
656
657    fn list_mcp_servers(
658        &mut self,
659        _params: kernel::ListMcpServersParams,
660        _results: kernel::ListMcpServersResults,
661    ) -> capnp::capability::Promise<(), capnp::Error> {
662        capnp::capability::Promise::err(capnp::Error::unimplemented(
663            "list_mcp_servers not yet implemented".into(),
664        ))
665    }
666
667    fn snapshot(
668        &mut self,
669        _params: kernel::SnapshotParams,
670        _results: kernel::SnapshotResults,
671    ) -> capnp::capability::Promise<(), capnp::Error> {
672        capnp::capability::Promise::err(capnp::Error::unimplemented(
673            "snapshot not yet implemented".into(),
674        ))
675    }
676
677    fn restore(
678        &mut self,
679        _params: kernel::RestoreParams,
680        _results: kernel::RestoreResults,
681    ) -> capnp::capability::Promise<(), capnp::Error> {
682        capnp::capability::Promise::err(capnp::Error::unimplemented(
683            "restore not yet implemented".into(),
684        ))
685    }
686
687    fn read_blob(
688        &mut self,
689        params: kernel::ReadBlobParams,
690        mut results: kernel::ReadBlobResults,
691    ) -> capnp::capability::Promise<(), capnp::Error> {
692        let vfs = self.kernel.vfs();
693
694        let id = match params.get() {
695            Ok(p) => match p.get_id() {
696                Ok(s) => match s.to_str() {
697                    Ok(s) => s.to_string(),
698                    Err(e) => return capnp::capability::Promise::err(capnp::Error::failed(format!("invalid utf8: {}", e))),
699                },
700                Err(e) => return capnp::capability::Promise::err(e),
701            },
702            Err(e) => return capnp::capability::Promise::err(e),
703        };
704
705        capnp::capability::Promise::from_future(async move {
706            let path = PathBuf::from(format!("/v/blobs/{}", id));
707
708            // Read the blob data
709            let data = vfs.read(&path).await.map_err(|e| {
710                capnp::Error::failed(format!("failed to read blob {}: {}", id, e))
711            })?;
712
713            // Create the stream implementation
714            let stream_impl = BlobStreamImpl::new(data);
715            let stream_client: blob_stream::Client = capnp_rpc::new_client(stream_impl);
716
717            results.get().set_stream(stream_client);
718            Ok(())
719        })
720    }
721
722    fn write_blob(
723        &mut self,
724        params: kernel::WriteBlobParams,
725        mut results: kernel::WriteBlobResults,
726    ) -> capnp::capability::Promise<(), capnp::Error> {
727        let vfs = self.kernel.vfs();
728
729        let (content_type, _size) = match params.get() {
730            Ok(p) => {
731                let ct = match p.get_content_type() {
732                    Ok(s) => match s.to_str() {
733                        Ok(s) => s.to_string(),
734                        Err(e) => return capnp::capability::Promise::err(capnp::Error::failed(format!("invalid utf8: {}", e))),
735                    },
736                    Err(e) => return capnp::capability::Promise::err(e),
737                };
738                let size = p.get_size();
739                (ct, size)
740            },
741            Err(e) => return capnp::capability::Promise::err(e),
742        };
743
744        // Generate a unique blob ID
745        let id = generate_blob_id();
746        let path = PathBuf::from(format!("/v/blobs/{}", id));
747
748        // Store content type as metadata (could be extended later)
749        tracing::debug!("Creating blob {} with content type {}", id, content_type);
750
751        // Create the sink implementation
752        let sink_impl = BlobSinkImpl::new(vfs, path);
753        let sink_client: blob_sink::Client = capnp_rpc::new_client(sink_impl);
754
755        let mut builder = results.get();
756        builder.set_id(&id);
757        builder.set_stream(sink_client);
758
759        capnp::capability::Promise::ok(())
760    }
761
762    fn delete_blob(
763        &mut self,
764        params: kernel::DeleteBlobParams,
765        mut results: kernel::DeleteBlobResults,
766    ) -> capnp::capability::Promise<(), capnp::Error> {
767        let vfs = self.kernel.vfs();
768
769        let id = match params.get() {
770            Ok(p) => match p.get_id() {
771                Ok(s) => match s.to_str() {
772                    Ok(s) => s.to_string(),
773                    Err(e) => return capnp::capability::Promise::err(capnp::Error::failed(format!("invalid utf8: {}", e))),
774                },
775                Err(e) => return capnp::capability::Promise::err(e),
776            },
777            Err(e) => return capnp::capability::Promise::err(e),
778        };
779
780        capnp::capability::Promise::from_future(async move {
781            let path = PathBuf::from(format!("/v/blobs/{}", id));
782
783            // Delete the blob
784            let success = match vfs.remove(&path).await {
785                Ok(()) => true,
786                Err(e) => {
787                    tracing::warn!("Failed to delete blob {}: {}", id, e);
788                    false
789                }
790            };
791
792            results.get().set_success(success);
793            Ok(())
794        })
795    }
796}
797
798// ============================================================
799// Value Conversion Helpers
800// ============================================================
801
802use kaish_schema::value;
803use kaish_schema::{output_data, output_node};
804use crate::ast::Value;
805use crate::interpreter::{EntryType, OutputData, OutputNode};
806
807/// Convert a kaish Value to a Cap'n Proto Value.
808fn set_value(builder: &mut value::Builder<'_>, value: &Value) {
809    match value {
810        Value::Null => builder.set_null(()),
811        Value::Bool(b) => builder.set_bool(*b),
812        Value::Int(i) => builder.set_int(*i),
813        Value::Float(f) => builder.set_float(*f),
814        Value::String(s) => builder.set_string(s),
815        Value::Json(json) => {
816            // Serialize Json values using the Cap'n Proto array/object types
817            set_json_value(builder.reborrow(), json);
818        }
819        Value::Blob(blob) => {
820            let mut blob_builder = builder.reborrow().init_blob();
821            blob_builder.set_id(&blob.id);
822            blob_builder.set_size(blob.size);
823            blob_builder.set_content_type(&blob.content_type);
824            if let Some(hash) = &blob.hash {
825                blob_builder.set_hash(hash);
826            }
827        }
828    }
829}
830
831/// Helper to serialize serde_json::Value to Cap'n Proto.
832fn set_json_value(mut builder: value::Builder<'_>, json: &serde_json::Value) {
833    match json {
834        serde_json::Value::Null => builder.set_null(()),
835        serde_json::Value::Bool(b) => builder.set_bool(*b),
836        serde_json::Value::Number(n) => {
837            if let Some(i) = n.as_i64() {
838                builder.set_int(i);
839            } else if let Some(f) = n.as_f64() {
840                builder.set_float(f);
841            } else {
842                builder.set_string(n.to_string());
843            }
844        }
845        serde_json::Value::String(s) => builder.set_string(s),
846        serde_json::Value::Array(arr) => {
847            let mut array_builder = builder.init_array(arr.len() as u32);
848            for (i, item) in arr.iter().enumerate() {
849                set_json_value(array_builder.reborrow().get(i as u32), item);
850            }
851        }
852        serde_json::Value::Object(obj) => {
853            let mut object_builder = builder.init_object(obj.len() as u32);
854            for (i, (key, val)) in obj.iter().enumerate() {
855                let mut entry = object_builder.reborrow().get(i as u32);
856                entry.set_key(key);
857                set_json_value(entry.init_value(), val);
858            }
859        }
860    }
861}
862
863/// Convert a kaish OutputData to Cap'n Proto OutputData.
864fn set_output_data(builder: &mut output_data::Builder<'_>, output: &OutputData) {
865    // Set headers if present
866    if let Some(ref headers) = output.headers {
867        let mut headers_builder = builder.reborrow().init_headers(headers.len() as u32);
868        for (i, header) in headers.iter().enumerate() {
869            headers_builder.set(i as u32, header);
870        }
871    }
872
873    // Set root nodes
874    let mut root_builder = builder.reborrow().init_root(output.root.len() as u32);
875    for (i, node) in output.root.iter().enumerate() {
876        let mut node_builder = root_builder.reborrow().get(i as u32);
877        set_output_node(&mut node_builder, node);
878    }
879}
880
881/// Convert a kaish OutputNode to Cap'n Proto OutputNode.
882fn set_output_node(builder: &mut output_node::Builder<'_>, node: &OutputNode) {
883    builder.set_name(&node.name);
884
885    // Set entry type
886    use kaish_schema::kaish_capnp::EntryType as SchemaEntryType;
887    let entry_type = match node.entry_type {
888        EntryType::Text => SchemaEntryType::Text,
889        EntryType::File => SchemaEntryType::File,
890        EntryType::Directory => SchemaEntryType::Directory,
891        EntryType::Executable => SchemaEntryType::Executable,
892        EntryType::Symlink => SchemaEntryType::Symlink,
893    };
894    builder.set_entry_type(entry_type);
895
896    // Set text if present
897    if let Some(ref text) = node.text {
898        builder.set_text(text);
899    }
900
901    // Set cells
902    if !node.cells.is_empty() {
903        let mut cells_builder = builder.reborrow().init_cells(node.cells.len() as u32);
904        for (i, cell) in node.cells.iter().enumerate() {
905            cells_builder.set(i as u32, cell);
906        }
907    }
908
909    // Set children recursively
910    if !node.children.is_empty() {
911        let mut children_builder = builder.reborrow().init_children(node.children.len() as u32);
912        for (i, child) in node.children.iter().enumerate() {
913            let mut child_builder = children_builder.reborrow().get(i as u32);
914            set_output_node(&mut child_builder, child);
915        }
916    }
917}
918
919/// Read a kaish Value from a Cap'n Proto Value.
920///
921/// Arrays and objects are serialized as JSON strings.
922fn read_value(reader: &value::Reader<'_>) -> Result<Value, capnp::Error> {
923    use value::Which;
924    match reader.which()? {
925        Which::Null(()) => Ok(Value::Null),
926        Which::Bool(b) => Ok(Value::Bool(b)),
927        Which::Int(i) => Ok(Value::Int(i)),
928        Which::Float(f) => Ok(Value::Float(f)),
929        Which::String(s) => {
930            let text = s?;
931            let string = text.to_str().map_err(|e| capnp::Error::failed(format!("invalid utf8: {}", e)))?;
932            Ok(Value::String(string.to_string()))
933        }
934        Which::Array(arr) => {
935            // Convert array to JSON string
936            let arr = arr?;
937            let items: Result<Vec<_>, _> = arr.iter().map(|v| read_value_to_json(&v)).collect();
938            let json_array = serde_json::Value::Array(items?);
939            Ok(Value::String(json_array.to_string()))
940        }
941        Which::Object(obj) => {
942            // Convert object to JSON string
943            let obj = obj?;
944            let mut map = serde_json::Map::new();
945            for kv in obj.iter() {
946                let key_text = kv.get_key()?;
947                let key = key_text.to_str().map_err(|e| capnp::Error::failed(format!("invalid utf8: {}", e)))?.to_string();
948                let val = read_value_to_json(&kv.get_value()?)?;
949                map.insert(key, val);
950            }
951            Ok(Value::String(serde_json::Value::Object(map).to_string()))
952        }
953        Which::Blob(blob) => {
954            // Convert blob reference to JSON string representation
955            let blob = blob?;
956            let id = blob.get_id()?.to_str().map_err(|e| capnp::Error::failed(format!("invalid utf8: {}", e)))?;
957            let size = blob.get_size();
958            let content_type = blob.get_content_type()?.to_str().map_err(|e| capnp::Error::failed(format!("invalid utf8: {}", e)))?;
959            let hash = blob.get_hash()?;
960
961            let mut map = serde_json::Map::new();
962            map.insert("_type".to_string(), serde_json::Value::String("blob".to_string()));
963            map.insert("id".to_string(), serde_json::Value::String(id.to_string()));
964            map.insert("size".to_string(), serde_json::Value::Number(size.into()));
965            map.insert("contentType".to_string(), serde_json::Value::String(content_type.to_string()));
966            if !hash.is_empty() {
967                let hash_hex: String = hash.iter().map(|b| format!("{:02x}", b)).collect();
968                map.insert("hash".to_string(), serde_json::Value::String(hash_hex));
969            }
970            Ok(Value::String(serde_json::Value::Object(map).to_string()))
971        }
972    }
973}
974
975/// Helper to convert Cap'n Proto Value to serde_json::Value
976fn read_value_to_json(reader: &value::Reader<'_>) -> Result<serde_json::Value, capnp::Error> {
977    use value::Which;
978    match reader.which()? {
979        Which::Null(()) => Ok(serde_json::Value::Null),
980        Which::Bool(b) => Ok(serde_json::Value::Bool(b)),
981        Which::Int(i) => Ok(serde_json::Value::Number(i.into())),
982        Which::Float(f) => Ok(serde_json::Number::from_f64(f)
983            .map(serde_json::Value::Number)
984            .unwrap_or(serde_json::Value::Null)),
985        Which::String(s) => {
986            let text = s?;
987            let string = text.to_str().map_err(|e| capnp::Error::failed(format!("invalid utf8: {}", e)))?;
988            Ok(serde_json::Value::String(string.to_string()))
989        }
990        Which::Array(arr) => {
991            let arr = arr?;
992            let items: Result<Vec<_>, _> = arr.iter().map(|v| read_value_to_json(&v)).collect();
993            Ok(serde_json::Value::Array(items?))
994        }
995        Which::Object(obj) => {
996            let obj = obj?;
997            let mut map = serde_json::Map::new();
998            for kv in obj.iter() {
999                let key_text = kv.get_key()?;
1000                let key = key_text.to_str().map_err(|e| capnp::Error::failed(format!("invalid utf8: {}", e)))?.to_string();
1001                let val = read_value_to_json(&kv.get_value()?)?;
1002                map.insert(key, val);
1003            }
1004            Ok(serde_json::Value::Object(map))
1005        }
1006        Which::Blob(blob) => {
1007            // Convert blob reference to JSON object
1008            let blob = blob?;
1009            let id = blob.get_id()?.to_str().map_err(|e| capnp::Error::failed(format!("invalid utf8: {}", e)))?;
1010            let size = blob.get_size();
1011            let content_type = blob.get_content_type()?.to_str().map_err(|e| capnp::Error::failed(format!("invalid utf8: {}", e)))?;
1012            let hash = blob.get_hash()?;
1013
1014            let mut map = serde_json::Map::new();
1015            map.insert("_type".to_string(), serde_json::Value::String("blob".to_string()));
1016            map.insert("id".to_string(), serde_json::Value::String(id.to_string()));
1017            map.insert("size".to_string(), serde_json::Value::Number(size.into()));
1018            map.insert("contentType".to_string(), serde_json::Value::String(content_type.to_string()));
1019            if !hash.is_empty() {
1020                let hash_hex: String = hash.iter().map(|b| format!("{:02x}", b)).collect();
1021                map.insert("hash".to_string(), serde_json::Value::String(hash_hex));
1022            }
1023            Ok(serde_json::Value::Object(map))
1024        }
1025    }
1026}