1use std::path::{Path, PathBuf};
21use std::rc::Rc;
22use std::sync::Arc;
23
24use anyhow::{Context, Result};
25use capnp_rpc::{rpc_twoparty_capnp, twoparty, RpcSystem};
26use futures::AsyncReadExt;
27use tokio::net::UnixListener;
28use tokio::sync::Mutex;
29use tokio_util::compat::TokioAsyncReadCompatExt;
30
31use kaish_schema::{blob_sink, blob_stream, kernel};
32
33use crate::kernel::Kernel;
34use crate::paths as state_paths;
35use crate::vfs::{Filesystem, VfsRouter};
36
37pub struct KernelRpcServer {
41 kernel: Arc<Kernel>,
42}
43
44impl KernelRpcServer {
45 pub fn new(kernel: Kernel) -> Self {
47 Self {
48 kernel: Arc::new(kernel),
49 }
50 }
51
52 pub async fn serve_default(&self) -> Result<()> {
56 let socket_path = state_paths::runtime_dir().join(format!("{}.sock", self.kernel.name()));
57 self.serve(&socket_path).await
58 }
59
60 pub async fn serve(&self, socket_path: &Path) -> Result<()> {
62 if let Some(parent) = socket_path.parent() {
64 std::fs::create_dir_all(parent).ok();
65 }
66
67 if socket_path.exists() {
69 std::fs::remove_file(socket_path)
70 .with_context(|| format!("removing old socket: {}", socket_path.display()))?;
71 }
72
73 let listener = UnixListener::bind(socket_path)
74 .with_context(|| format!("binding to socket: {}", socket_path.display()))?;
75
76 tracing::info!("Kernel RPC server listening on {}", socket_path.display());
77
78 loop {
79 let (stream, _addr) = listener.accept().await?;
80
81 let stream = stream.compat();
83
84 let (reader, writer) = stream.split();
85
86 let network = twoparty::VatNetwork::new(
88 reader,
89 writer,
90 rpc_twoparty_capnp::Side::Server,
91 Default::default(),
92 );
93
94 let kernel_impl = KernelImpl::new(self.kernel.clone());
96 let kernel_client: kernel::Client = capnp_rpc::new_client(kernel_impl);
97
98 let rpc_system = RpcSystem::new(Box::new(network), Some(kernel_client.clone().client));
100
101 tokio::task::spawn_local(async move {
103 if let Err(e) = rpc_system.await {
104 tracing::error!("RPC error: {}", e);
105 }
106 });
107 }
108 }
109}
110
111struct BlobStreamImpl {
117 data: Vec<u8>,
118 position: Mutex<usize>,
119}
120
121impl BlobStreamImpl {
122 fn new(data: Vec<u8>) -> Self {
123 Self {
124 data,
125 position: Mutex::new(0),
126 }
127 }
128}
129
130#[allow(refining_impl_trait)]
131impl blob_stream::Server for BlobStreamImpl {
132 fn read(
133 self: Rc<Self>,
134 params: blob_stream::ReadParams,
135 mut results: blob_stream::ReadResults,
136 ) -> capnp::capability::Promise<(), capnp::Error> {
137 let max_bytes = match params.get() {
138 Ok(p) => p.get_max_bytes() as usize,
139 Err(e) => return capnp::capability::Promise::err(e),
140 };
141
142 let (chunk, done) = tokio::task::block_in_place(|| {
144 let mut pos = self.position.blocking_lock();
145 let remaining = self.data.len().saturating_sub(*pos);
146 let read_size = max_bytes.min(remaining);
147 let chunk = self.data[*pos..*pos + read_size].to_vec();
148 *pos += read_size;
149 let done = *pos >= self.data.len();
150 (chunk, done)
151 });
152
153 let mut builder = results.get();
154 builder.set_data(&chunk);
155 builder.set_done(done);
156 capnp::capability::Promise::ok(())
157 }
158
159 fn size(
160 self: Rc<Self>,
161 _params: blob_stream::SizeParams,
162 mut results: blob_stream::SizeResults,
163 ) -> capnp::capability::Promise<(), capnp::Error> {
164 let mut builder = results.get();
165 builder.set_bytes(self.data.len() as u64);
166 builder.set_known(true);
167 capnp::capability::Promise::ok(())
168 }
169
170 fn cancel(
171 self: Rc<Self>,
172 _params: blob_stream::CancelParams,
173 _results: blob_stream::CancelResults,
174 ) -> capnp::capability::Promise<(), capnp::Error> {
175 capnp::capability::Promise::ok(())
177 }
178}
179
180struct BlobSinkImpl {
182 vfs: Arc<VfsRouter>,
183 path: PathBuf,
184 data: Mutex<Vec<u8>>,
185 aborted: Mutex<bool>,
186}
187
188impl BlobSinkImpl {
189 fn new(vfs: Arc<VfsRouter>, path: PathBuf) -> Self {
190 Self {
191 vfs,
192 path,
193 data: Mutex::new(Vec::new()),
194 aborted: Mutex::new(false),
195 }
196 }
197}
198
199#[allow(refining_impl_trait)]
200impl blob_sink::Server for BlobSinkImpl {
201 fn write(
202 self: Rc<Self>,
203 params: blob_sink::WriteParams,
204 _results: blob_sink::WriteResults,
205 ) -> capnp::capability::Promise<(), capnp::Error> {
206 let chunk = match params.get() {
207 Ok(p) => match p.get_data() {
208 Ok(d) => d.to_vec(),
209 Err(e) => return capnp::capability::Promise::err(e),
210 },
211 Err(e) => return capnp::capability::Promise::err(e),
212 };
213
214 tokio::task::block_in_place(|| {
215 let aborted = self.aborted.blocking_lock();
216 if *aborted {
217 return;
218 }
219 drop(aborted);
220
221 let mut data = self.data.blocking_lock();
222 data.extend(chunk);
223 });
224
225 capnp::capability::Promise::ok(())
226 }
227
228 fn finish(
229 self: Rc<Self>,
230 _params: blob_sink::FinishParams,
231 mut results: blob_sink::FinishResults,
232 ) -> capnp::capability::Promise<(), capnp::Error> {
233 let vfs = self.vfs.clone();
234 let path = self.path.clone();
235
236 let (data, hash) = tokio::task::block_in_place(|| {
238 let aborted = self.aborted.blocking_lock();
239 if *aborted {
240 return (Vec::new(), Vec::new());
241 }
242 drop(aborted);
243
244 let data = self.data.blocking_lock().clone();
245
246 use std::collections::hash_map::DefaultHasher;
248 use std::hash::{Hash, Hasher};
249 let mut hasher = DefaultHasher::new();
250 data.hash(&mut hasher);
251 let hash_value = hasher.finish();
252 let hash = hash_value.to_be_bytes().to_vec();
253
254 (data, hash)
255 });
256
257 capnp::capability::Promise::from_future(async move {
258 let parent = path.parent().unwrap_or(Path::new("/v/blobs"));
260 if let Err(e) = vfs.mkdir(parent).await {
261 if e.kind() != std::io::ErrorKind::AlreadyExists {
263 tracing::warn!("Failed to create blob directory: {}", e);
264 }
265 }
266
267 vfs.write(&path, &data).await.map_err(|e| {
269 capnp::Error::failed(format!("failed to write blob: {}", e))
270 })?;
271
272 results.get().set_hash(&hash);
273 Ok(())
274 })
275 }
276
277 fn abort(
278 self: Rc<Self>,
279 _params: blob_sink::AbortParams,
280 _results: blob_sink::AbortResults,
281 ) -> capnp::capability::Promise<(), capnp::Error> {
282 tokio::task::block_in_place(|| {
283 let mut aborted = self.aborted.blocking_lock();
284 *aborted = true;
285 });
286 capnp::capability::Promise::ok(())
287 }
288}
289
290fn generate_blob_id() -> String {
292 use std::time::{SystemTime, UNIX_EPOCH};
293 use std::sync::atomic::{AtomicU64, Ordering};
294
295 static COUNTER: AtomicU64 = AtomicU64::new(0);
296
297 let timestamp = SystemTime::now()
298 .duration_since(UNIX_EPOCH)
299 .map(|d| d.as_nanos())
300 .unwrap_or(0);
301 let count = COUNTER.fetch_add(1, Ordering::SeqCst);
302
303 format!("{:x}-{:x}", timestamp, count)
304}
305
306struct KernelImpl {
308 kernel: Arc<Kernel>,
309}
310
311impl KernelImpl {
312 fn new(kernel: Arc<Kernel>) -> Self {
313 Self { kernel }
314 }
315}
316
317#[allow(refining_impl_trait)]
318impl kernel::Server for KernelImpl {
319 fn execute(
321 self: Rc<Self>,
322 params: kernel::ExecuteParams,
323 mut results: kernel::ExecuteResults,
324 ) -> capnp::capability::Promise<(), capnp::Error> {
325 let kernel = self.kernel.clone();
326
327 let input = match params.get() {
328 Ok(p) => match p.get_input() {
329 Ok(s) => match s.to_str() {
330 Ok(s) => s.to_string(),
331 Err(e) => return capnp::capability::Promise::err(capnp::Error::failed(format!("invalid utf8: {}", e))),
332 },
333 Err(e) => return capnp::capability::Promise::err(e),
334 },
335 Err(e) => return capnp::capability::Promise::err(e),
336 };
337
338 capnp::capability::Promise::from_future(async move {
339 let exec_result = kernel.execute(&input).await.map_err(|e| {
340 capnp::Error::failed(format!("execution error: {}", e))
341 })?;
342
343 let mut result_builder = results.get().init_result();
345 result_builder.set_code(exec_result.code as i32);
346 result_builder.set_ok(exec_result.ok());
347 result_builder.set_err(&exec_result.err);
348 result_builder.set_stdout(exec_result.out.as_bytes());
349 result_builder.set_stderr(b"");
350
351 if let Some(data) = &exec_result.data {
353 set_value(&mut result_builder.reborrow().init_data(), data);
354 }
355
356 if let Some(ref output) = exec_result.output {
358 set_output_data(&mut result_builder.reborrow().init_output(), output);
359 }
360
361 Ok(())
362 })
363 }
364
365 fn get_var(
367 self: Rc<Self>,
368 params: kernel::GetVarParams,
369 mut results: kernel::GetVarResults,
370 ) -> capnp::capability::Promise<(), capnp::Error> {
371 let kernel = self.kernel.clone();
372
373 let name = match params.get() {
374 Ok(p) => match p.get_name() {
375 Ok(s) => match s.to_str() {
376 Ok(s) => s.to_string(),
377 Err(e) => return capnp::capability::Promise::err(capnp::Error::failed(format!("invalid utf8: {}", e))),
378 },
379 Err(e) => return capnp::capability::Promise::err(e),
380 },
381 Err(e) => return capnp::capability::Promise::err(e),
382 };
383
384 capnp::capability::Promise::from_future(async move {
385 if let Some(value) = kernel.get_var(&name).await {
386 set_value(&mut results.get().init_value(), &value);
387 }
388
389 Ok(())
390 })
391 }
392
393 fn set_var(
395 self: Rc<Self>,
396 params: kernel::SetVarParams,
397 _results: kernel::SetVarResults,
398 ) -> capnp::capability::Promise<(), capnp::Error> {
399 let kernel = self.kernel.clone();
400
401 let (name, value) = match params.get() {
402 Ok(p) => {
403 let name = match p.get_name() {
404 Ok(s) => match s.to_str() {
405 Ok(s) => s.to_string(),
406 Err(e) => return capnp::capability::Promise::err(capnp::Error::failed(format!("invalid utf8: {}", e))),
407 },
408 Err(e) => return capnp::capability::Promise::err(e),
409 };
410 let value_reader = match p.get_value() {
411 Ok(v) => v,
412 Err(e) => return capnp::capability::Promise::err(e),
413 };
414 let value = match read_value(&value_reader) {
415 Ok(v) => v,
416 Err(e) => return capnp::capability::Promise::err(e),
417 };
418 (name, value)
419 }
420 Err(e) => return capnp::capability::Promise::err(e),
421 };
422
423 capnp::capability::Promise::from_future(async move {
424 kernel.set_var(&name, value).await;
425 Ok(())
426 })
427 }
428
429 fn list_vars(
431 self: Rc<Self>,
432 _params: kernel::ListVarsParams,
433 mut results: kernel::ListVarsResults,
434 ) -> capnp::capability::Promise<(), capnp::Error> {
435 let kernel = self.kernel.clone();
436
437 capnp::capability::Promise::from_future(async move {
438 let vars = kernel.list_vars().await;
439
440 let mut list = results.get().init_vars(vars.len() as u32);
441 for (i, (name, value)) in vars.into_iter().enumerate() {
442 let mut entry = list.reborrow().get(i as u32);
443 entry.set_key(&name);
444 set_value(&mut entry.init_value(), &value);
445 }
446
447 Ok(())
448 })
449 }
450
451 fn list_tools(
453 self: Rc<Self>,
454 _params: kernel::ListToolsParams,
455 mut results: kernel::ListToolsResults,
456 ) -> capnp::capability::Promise<(), capnp::Error> {
457 let schemas = self.kernel.tool_schemas();
458
459 let mut list = results.get().init_tools(schemas.len() as u32);
460 for (i, schema) in schemas.into_iter().enumerate() {
461 let mut entry = list.reborrow().get(i as u32);
462 entry.set_name(&schema.name);
463 entry.set_description(&schema.description);
464 }
466
467 capnp::capability::Promise::ok(())
468 }
469
470 fn ping(
472 self: Rc<Self>,
473 _params: kernel::PingParams,
474 mut results: kernel::PingResults,
475 ) -> capnp::capability::Promise<(), capnp::Error> {
476 results.get().set_pong("pong");
477 capnp::capability::Promise::ok(())
478 }
479
480 fn shutdown(
486 self: Rc<Self>,
487 _params: kernel::ShutdownParams,
488 _results: kernel::ShutdownResults,
489 ) -> capnp::capability::Promise<(), capnp::Error> {
490 tracing::info!("Kernel shutdown requested via RPC");
491 capnp::capability::Promise::ok(())
492 }
493
494 fn reset(
496 self: Rc<Self>,
497 _params: kernel::ResetParams,
498 _results: kernel::ResetResults,
499 ) -> capnp::capability::Promise<(), capnp::Error> {
500 let kernel = self.kernel.clone();
501
502 capnp::capability::Promise::from_future(async move {
503 kernel.reset().await.map_err(|e| {
504 capnp::Error::failed(format!("reset error: {}", e))
505 })?;
506 Ok(())
507 })
508 }
509
510 fn get_cwd(
514 self: Rc<Self>,
515 _params: kernel::GetCwdParams,
516 mut results: kernel::GetCwdResults,
517 ) -> capnp::capability::Promise<(), capnp::Error> {
518 let kernel = self.kernel.clone();
519
520 capnp::capability::Promise::from_future(async move {
521 let cwd = kernel.cwd().await;
522 results.get().set_path(cwd.to_string_lossy());
523 Ok(())
524 })
525 }
526
527 fn set_cwd(
529 self: Rc<Self>,
530 params: kernel::SetCwdParams,
531 mut results: kernel::SetCwdResults,
532 ) -> capnp::capability::Promise<(), capnp::Error> {
533 let kernel = self.kernel.clone();
534
535 let path = match params.get() {
536 Ok(p) => match p.get_path() {
537 Ok(s) => match s.to_str() {
538 Ok(s) => s.to_string(),
539 Err(e) => return capnp::capability::Promise::err(capnp::Error::failed(format!("invalid utf8: {}", e))),
540 },
541 Err(e) => return capnp::capability::Promise::err(e),
542 },
543 Err(e) => return capnp::capability::Promise::err(e),
544 };
545
546 capnp::capability::Promise::from_future(async move {
547 kernel.set_cwd(std::path::PathBuf::from(&path)).await;
548 let mut r = results.get();
549 r.set_success(true);
550 r.set_error("");
551 Ok(())
552 })
553 }
554
555 fn get_last_result(
559 self: Rc<Self>,
560 _params: kernel::GetLastResultParams,
561 mut results: kernel::GetLastResultResults,
562 ) -> capnp::capability::Promise<(), capnp::Error> {
563 let kernel = self.kernel.clone();
564
565 capnp::capability::Promise::from_future(async move {
566 let exec_result = kernel.last_result().await;
567
568 let mut result_builder = results.get().init_result();
569 result_builder.set_code(exec_result.code as i32);
570 result_builder.set_ok(exec_result.ok());
571 result_builder.set_err(&exec_result.err);
572 result_builder.set_stdout(exec_result.out.as_bytes());
573 result_builder.set_stderr(b"");
574
575 if let Some(data) = &exec_result.data {
577 set_value(&mut result_builder.reborrow().init_data(), data);
578 }
579
580 if let Some(ref output) = exec_result.output {
582 set_output_data(&mut result_builder.reborrow().init_output(), output);
583 }
584
585 Ok(())
586 })
587 }
588
589 fn execute_streaming(
592 self: Rc<Self>,
593 _params: kernel::ExecuteStreamingParams,
594 _results: kernel::ExecuteStreamingResults,
595 ) -> capnp::capability::Promise<(), capnp::Error> {
596 capnp::capability::Promise::err(capnp::Error::unimplemented(
597 "execute_streaming not yet implemented".into(),
598 ))
599 }
600
601 fn call_tool(
602 self: Rc<Self>,
603 _params: kernel::CallToolParams,
604 _results: kernel::CallToolResults,
605 ) -> capnp::capability::Promise<(), capnp::Error> {
606 capnp::capability::Promise::err(capnp::Error::unimplemented(
607 "call_tool not yet implemented".into(),
608 ))
609 }
610
611 fn mount(
612 self: Rc<Self>,
613 _params: kernel::MountParams,
614 _results: kernel::MountResults,
615 ) -> capnp::capability::Promise<(), capnp::Error> {
616 capnp::capability::Promise::err(capnp::Error::unimplemented(
617 "mount not yet implemented".into(),
618 ))
619 }
620
621 fn unmount(
622 self: Rc<Self>,
623 _params: kernel::UnmountParams,
624 _results: kernel::UnmountResults,
625 ) -> capnp::capability::Promise<(), capnp::Error> {
626 capnp::capability::Promise::err(capnp::Error::unimplemented(
627 "unmount not yet implemented".into(),
628 ))
629 }
630
631 fn list_mounts(
632 self: Rc<Self>,
633 _params: kernel::ListMountsParams,
634 _results: kernel::ListMountsResults,
635 ) -> capnp::capability::Promise<(), capnp::Error> {
636 capnp::capability::Promise::err(capnp::Error::unimplemented(
637 "list_mounts not yet implemented".into(),
638 ))
639 }
640
641 fn register_mcp(
642 self: Rc<Self>,
643 _params: kernel::RegisterMcpParams,
644 _results: kernel::RegisterMcpResults,
645 ) -> capnp::capability::Promise<(), capnp::Error> {
646 capnp::capability::Promise::err(capnp::Error::unimplemented(
647 "register_mcp not yet implemented".into(),
648 ))
649 }
650
651 fn unregister_mcp(
652 self: Rc<Self>,
653 _params: kernel::UnregisterMcpParams,
654 _results: kernel::UnregisterMcpResults,
655 ) -> capnp::capability::Promise<(), capnp::Error> {
656 capnp::capability::Promise::err(capnp::Error::unimplemented(
657 "unregister_mcp not yet implemented".into(),
658 ))
659 }
660
661 fn list_mcp_servers(
662 self: Rc<Self>,
663 _params: kernel::ListMcpServersParams,
664 _results: kernel::ListMcpServersResults,
665 ) -> capnp::capability::Promise<(), capnp::Error> {
666 capnp::capability::Promise::err(capnp::Error::unimplemented(
667 "list_mcp_servers not yet implemented".into(),
668 ))
669 }
670
671 fn snapshot(
672 self: Rc<Self>,
673 _params: kernel::SnapshotParams,
674 _results: kernel::SnapshotResults,
675 ) -> capnp::capability::Promise<(), capnp::Error> {
676 capnp::capability::Promise::err(capnp::Error::unimplemented(
677 "snapshot not yet implemented".into(),
678 ))
679 }
680
681 fn restore(
682 self: Rc<Self>,
683 _params: kernel::RestoreParams,
684 _results: kernel::RestoreResults,
685 ) -> capnp::capability::Promise<(), capnp::Error> {
686 capnp::capability::Promise::err(capnp::Error::unimplemented(
687 "restore not yet implemented".into(),
688 ))
689 }
690
691 fn read_blob(
692 self: Rc<Self>,
693 params: kernel::ReadBlobParams,
694 mut results: kernel::ReadBlobResults,
695 ) -> capnp::capability::Promise<(), capnp::Error> {
696 let vfs = self.kernel.vfs();
697
698 let id = match params.get() {
699 Ok(p) => match p.get_id() {
700 Ok(s) => match s.to_str() {
701 Ok(s) => s.to_string(),
702 Err(e) => return capnp::capability::Promise::err(capnp::Error::failed(format!("invalid utf8: {}", e))),
703 },
704 Err(e) => return capnp::capability::Promise::err(e),
705 },
706 Err(e) => return capnp::capability::Promise::err(e),
707 };
708
709 capnp::capability::Promise::from_future(async move {
710 let path = PathBuf::from(format!("/v/blobs/{}", id));
711
712 let data = vfs.read(&path).await.map_err(|e| {
714 capnp::Error::failed(format!("failed to read blob {}: {}", id, e))
715 })?;
716
717 let stream_impl = BlobStreamImpl::new(data);
719 let stream_client: blob_stream::Client = capnp_rpc::new_client(stream_impl);
720
721 results.get().set_stream(stream_client);
722 Ok(())
723 })
724 }
725
726 fn write_blob(
727 self: Rc<Self>,
728 params: kernel::WriteBlobParams,
729 mut results: kernel::WriteBlobResults,
730 ) -> capnp::capability::Promise<(), capnp::Error> {
731 let vfs = self.kernel.vfs();
732
733 let (content_type, _size) = match params.get() {
734 Ok(p) => {
735 let ct = match p.get_content_type() {
736 Ok(s) => match s.to_str() {
737 Ok(s) => s.to_string(),
738 Err(e) => return capnp::capability::Promise::err(capnp::Error::failed(format!("invalid utf8: {}", e))),
739 },
740 Err(e) => return capnp::capability::Promise::err(e),
741 };
742 let size = p.get_size();
743 (ct, size)
744 },
745 Err(e) => return capnp::capability::Promise::err(e),
746 };
747
748 let id = generate_blob_id();
750 let path = PathBuf::from(format!("/v/blobs/{}", id));
751
752 tracing::debug!("Creating blob {} with content type {}", id, content_type);
754
755 let sink_impl = BlobSinkImpl::new(vfs, path);
757 let sink_client: blob_sink::Client = capnp_rpc::new_client(sink_impl);
758
759 let mut builder = results.get();
760 builder.set_id(&id);
761 builder.set_stream(sink_client);
762
763 capnp::capability::Promise::ok(())
764 }
765
766 fn delete_blob(
767 self: Rc<Self>,
768 params: kernel::DeleteBlobParams,
769 mut results: kernel::DeleteBlobResults,
770 ) -> capnp::capability::Promise<(), capnp::Error> {
771 let vfs = self.kernel.vfs();
772
773 let id = match params.get() {
774 Ok(p) => match p.get_id() {
775 Ok(s) => match s.to_str() {
776 Ok(s) => s.to_string(),
777 Err(e) => return capnp::capability::Promise::err(capnp::Error::failed(format!("invalid utf8: {}", e))),
778 },
779 Err(e) => return capnp::capability::Promise::err(e),
780 },
781 Err(e) => return capnp::capability::Promise::err(e),
782 };
783
784 capnp::capability::Promise::from_future(async move {
785 let path = PathBuf::from(format!("/v/blobs/{}", id));
786
787 let success = match vfs.remove(&path).await {
789 Ok(()) => true,
790 Err(e) => {
791 tracing::warn!("Failed to delete blob {}: {}", id, e);
792 false
793 }
794 };
795
796 results.get().set_success(success);
797 Ok(())
798 })
799 }
800}
801
802use kaish_schema::value;
807use kaish_schema::{output_data, output_node};
808use crate::ast::Value;
809use crate::interpreter::{EntryType, OutputData, OutputNode};
810
811fn set_value(builder: &mut value::Builder<'_>, value: &Value) {
813 match value {
814 Value::Null => builder.set_null(()),
815 Value::Bool(b) => builder.set_bool(*b),
816 Value::Int(i) => builder.set_int(*i),
817 Value::Float(f) => builder.set_float(*f),
818 Value::String(s) => builder.set_string(s),
819 Value::Json(json) => {
820 set_json_value(builder.reborrow(), json);
822 }
823 Value::Blob(blob) => {
824 let mut blob_builder = builder.reborrow().init_blob();
825 blob_builder.set_id(&blob.id);
826 blob_builder.set_size(blob.size);
827 blob_builder.set_content_type(&blob.content_type);
828 if let Some(hash) = &blob.hash {
829 blob_builder.set_hash(hash);
830 }
831 }
832 }
833}
834
835fn set_json_value(mut builder: value::Builder<'_>, json: &serde_json::Value) {
837 match json {
838 serde_json::Value::Null => builder.set_null(()),
839 serde_json::Value::Bool(b) => builder.set_bool(*b),
840 serde_json::Value::Number(n) => {
841 if let Some(i) = n.as_i64() {
842 builder.set_int(i);
843 } else if let Some(f) = n.as_f64() {
844 builder.set_float(f);
845 } else {
846 builder.set_string(n.to_string());
847 }
848 }
849 serde_json::Value::String(s) => builder.set_string(s),
850 serde_json::Value::Array(arr) => {
851 let mut array_builder = builder.init_array(arr.len() as u32);
852 for (i, item) in arr.iter().enumerate() {
853 set_json_value(array_builder.reborrow().get(i as u32), item);
854 }
855 }
856 serde_json::Value::Object(obj) => {
857 let mut object_builder = builder.init_object(obj.len() as u32);
858 for (i, (key, val)) in obj.iter().enumerate() {
859 let mut entry = object_builder.reborrow().get(i as u32);
860 entry.set_key(key);
861 set_json_value(entry.init_value(), val);
862 }
863 }
864 }
865}
866
867fn set_output_data(builder: &mut output_data::Builder<'_>, output: &OutputData) {
869 if let Some(ref headers) = output.headers {
871 let mut headers_builder = builder.reborrow().init_headers(headers.len() as u32);
872 for (i, header) in headers.iter().enumerate() {
873 headers_builder.set(i as u32, header);
874 }
875 }
876
877 let mut root_builder = builder.reborrow().init_root(output.root.len() as u32);
879 for (i, node) in output.root.iter().enumerate() {
880 let mut node_builder = root_builder.reborrow().get(i as u32);
881 set_output_node(&mut node_builder, node);
882 }
883}
884
885fn set_output_node(builder: &mut output_node::Builder<'_>, node: &OutputNode) {
887 builder.set_name(&node.name);
888
889 use kaish_schema::kaish_capnp::EntryType as SchemaEntryType;
891 let entry_type = match node.entry_type {
892 EntryType::Text => SchemaEntryType::Text,
893 EntryType::File => SchemaEntryType::File,
894 EntryType::Directory => SchemaEntryType::Directory,
895 EntryType::Executable => SchemaEntryType::Executable,
896 EntryType::Symlink => SchemaEntryType::Symlink,
897 };
898 builder.set_entry_type(entry_type);
899
900 if let Some(ref text) = node.text {
902 builder.set_text(text);
903 }
904
905 if !node.cells.is_empty() {
907 let mut cells_builder = builder.reborrow().init_cells(node.cells.len() as u32);
908 for (i, cell) in node.cells.iter().enumerate() {
909 cells_builder.set(i as u32, cell);
910 }
911 }
912
913 if !node.children.is_empty() {
915 let mut children_builder = builder.reborrow().init_children(node.children.len() as u32);
916 for (i, child) in node.children.iter().enumerate() {
917 let mut child_builder = children_builder.reborrow().get(i as u32);
918 set_output_node(&mut child_builder, child);
919 }
920 }
921}
922
923fn read_value(reader: &value::Reader<'_>) -> Result<Value, capnp::Error> {
927 use value::Which;
928 match reader.which()? {
929 Which::Null(()) => Ok(Value::Null),
930 Which::Bool(b) => Ok(Value::Bool(b)),
931 Which::Int(i) => Ok(Value::Int(i)),
932 Which::Float(f) => Ok(Value::Float(f)),
933 Which::String(s) => {
934 let text = s?;
935 let string = text.to_str().map_err(|e| capnp::Error::failed(format!("invalid utf8: {}", e)))?;
936 Ok(Value::String(string.to_string()))
937 }
938 Which::Array(arr) => {
939 let arr = arr?;
941 let items: Result<Vec<_>, _> = arr.iter().map(|v| read_value_to_json(&v)).collect();
942 let json_array = serde_json::Value::Array(items?);
943 Ok(Value::String(json_array.to_string()))
944 }
945 Which::Object(obj) => {
946 let obj = obj?;
948 let mut map = serde_json::Map::new();
949 for kv in obj.iter() {
950 let key_text = kv.get_key()?;
951 let key = key_text.to_str().map_err(|e| capnp::Error::failed(format!("invalid utf8: {}", e)))?.to_string();
952 let val = read_value_to_json(&kv.get_value()?)?;
953 map.insert(key, val);
954 }
955 Ok(Value::String(serde_json::Value::Object(map).to_string()))
956 }
957 Which::Blob(blob) => {
958 let blob = blob?;
960 let id = blob.get_id()?.to_str().map_err(|e| capnp::Error::failed(format!("invalid utf8: {}", e)))?;
961 let size = blob.get_size();
962 let content_type = blob.get_content_type()?.to_str().map_err(|e| capnp::Error::failed(format!("invalid utf8: {}", e)))?;
963 let hash = blob.get_hash()?;
964
965 let mut map = serde_json::Map::new();
966 map.insert("_type".to_string(), serde_json::Value::String("blob".to_string()));
967 map.insert("id".to_string(), serde_json::Value::String(id.to_string()));
968 map.insert("size".to_string(), serde_json::Value::Number(size.into()));
969 map.insert("contentType".to_string(), serde_json::Value::String(content_type.to_string()));
970 if !hash.is_empty() {
971 let hash_hex: String = hash.iter().map(|b| format!("{:02x}", b)).collect();
972 map.insert("hash".to_string(), serde_json::Value::String(hash_hex));
973 }
974 Ok(Value::String(serde_json::Value::Object(map).to_string()))
975 }
976 }
977}
978
979fn read_value_to_json(reader: &value::Reader<'_>) -> Result<serde_json::Value, capnp::Error> {
981 use value::Which;
982 match reader.which()? {
983 Which::Null(()) => Ok(serde_json::Value::Null),
984 Which::Bool(b) => Ok(serde_json::Value::Bool(b)),
985 Which::Int(i) => Ok(serde_json::Value::Number(i.into())),
986 Which::Float(f) => Ok(serde_json::Number::from_f64(f)
987 .map(serde_json::Value::Number)
988 .unwrap_or(serde_json::Value::Null)),
989 Which::String(s) => {
990 let text = s?;
991 let string = text.to_str().map_err(|e| capnp::Error::failed(format!("invalid utf8: {}", e)))?;
992 Ok(serde_json::Value::String(string.to_string()))
993 }
994 Which::Array(arr) => {
995 let arr = arr?;
996 let items: Result<Vec<_>, _> = arr.iter().map(|v| read_value_to_json(&v)).collect();
997 Ok(serde_json::Value::Array(items?))
998 }
999 Which::Object(obj) => {
1000 let obj = obj?;
1001 let mut map = serde_json::Map::new();
1002 for kv in obj.iter() {
1003 let key_text = kv.get_key()?;
1004 let key = key_text.to_str().map_err(|e| capnp::Error::failed(format!("invalid utf8: {}", e)))?.to_string();
1005 let val = read_value_to_json(&kv.get_value()?)?;
1006 map.insert(key, val);
1007 }
1008 Ok(serde_json::Value::Object(map))
1009 }
1010 Which::Blob(blob) => {
1011 let blob = blob?;
1013 let id = blob.get_id()?.to_str().map_err(|e| capnp::Error::failed(format!("invalid utf8: {}", e)))?;
1014 let size = blob.get_size();
1015 let content_type = blob.get_content_type()?.to_str().map_err(|e| capnp::Error::failed(format!("invalid utf8: {}", e)))?;
1016 let hash = blob.get_hash()?;
1017
1018 let mut map = serde_json::Map::new();
1019 map.insert("_type".to_string(), serde_json::Value::String("blob".to_string()));
1020 map.insert("id".to_string(), serde_json::Value::String(id.to_string()));
1021 map.insert("size".to_string(), serde_json::Value::Number(size.into()));
1022 map.insert("contentType".to_string(), serde_json::Value::String(content_type.to_string()));
1023 if !hash.is_empty() {
1024 let hash_hex: String = hash.iter().map(|b| format!("{:02x}", b)).collect();
1025 map.insert("hash".to_string(), serde_json::Value::String(hash_hex));
1026 }
1027 Ok(serde_json::Value::Object(map))
1028 }
1029 }
1030}