1use std::path::{Path, PathBuf};
21use std::sync::Arc;
22
23use anyhow::{Context, Result};
24use capnp_rpc::{rpc_twoparty_capnp, twoparty, RpcSystem};
25use futures::AsyncReadExt;
26use tokio::net::UnixListener;
27use tokio::sync::Mutex;
28use tokio_util::compat::TokioAsyncReadCompatExt;
29
30use kaish_schema::{blob_sink, blob_stream, kernel};
31
32use crate::kernel::Kernel;
33use crate::paths as state_paths;
34use crate::vfs::{Filesystem, VfsRouter};
35
36pub struct KernelRpcServer {
40 kernel: Arc<Kernel>,
41}
42
43impl KernelRpcServer {
44 pub fn new(kernel: Kernel) -> Self {
46 Self {
47 kernel: Arc::new(kernel),
48 }
49 }
50
51 pub async fn serve_default(&self) -> Result<()> {
55 let socket_path = state_paths::runtime_dir().join(format!("{}.sock", self.kernel.name()));
56 self.serve(&socket_path).await
57 }
58
59 pub async fn serve(&self, socket_path: &Path) -> Result<()> {
61 if let Some(parent) = socket_path.parent() {
63 std::fs::create_dir_all(parent).ok();
64 }
65
66 if socket_path.exists() {
68 std::fs::remove_file(socket_path)
69 .with_context(|| format!("removing old socket: {}", socket_path.display()))?;
70 }
71
72 let listener = UnixListener::bind(socket_path)
73 .with_context(|| format!("binding to socket: {}", socket_path.display()))?;
74
75 tracing::info!("Kernel RPC server listening on {}", socket_path.display());
76
77 loop {
78 let (stream, _addr) = listener.accept().await?;
79
80 let stream = stream.compat();
82
83 let (reader, writer) = stream.split();
84
85 let network = twoparty::VatNetwork::new(
87 reader,
88 writer,
89 rpc_twoparty_capnp::Side::Server,
90 Default::default(),
91 );
92
93 let kernel_impl = KernelImpl::new(self.kernel.clone());
95 let kernel_client: kernel::Client = capnp_rpc::new_client(kernel_impl);
96
97 let rpc_system = RpcSystem::new(Box::new(network), Some(kernel_client.clone().client));
99
100 tokio::task::spawn_local(async move {
102 if let Err(e) = rpc_system.await {
103 tracing::error!("RPC error: {}", e);
104 }
105 });
106 }
107 }
108}
109
110struct BlobStreamImpl {
116 data: Vec<u8>,
117 position: Mutex<usize>,
118}
119
120impl BlobStreamImpl {
121 fn new(data: Vec<u8>) -> Self {
122 Self {
123 data,
124 position: Mutex::new(0),
125 }
126 }
127}
128
129impl blob_stream::Server for BlobStreamImpl {
130 fn read(
131 &mut self,
132 params: blob_stream::ReadParams,
133 mut results: blob_stream::ReadResults,
134 ) -> capnp::capability::Promise<(), capnp::Error> {
135 let max_bytes = match params.get() {
136 Ok(p) => p.get_max_bytes() as usize,
137 Err(e) => return capnp::capability::Promise::err(e),
138 };
139
140 let (chunk, done) = tokio::task::block_in_place(|| {
142 let mut pos = self.position.blocking_lock();
143 let remaining = self.data.len().saturating_sub(*pos);
144 let read_size = max_bytes.min(remaining);
145 let chunk = self.data[*pos..*pos + read_size].to_vec();
146 *pos += read_size;
147 let done = *pos >= self.data.len();
148 (chunk, done)
149 });
150
151 let mut builder = results.get();
152 builder.set_data(&chunk);
153 builder.set_done(done);
154 capnp::capability::Promise::ok(())
155 }
156
157 fn size(
158 &mut self,
159 _params: blob_stream::SizeParams,
160 mut results: blob_stream::SizeResults,
161 ) -> capnp::capability::Promise<(), capnp::Error> {
162 let mut builder = results.get();
163 builder.set_bytes(self.data.len() as u64);
164 builder.set_known(true);
165 capnp::capability::Promise::ok(())
166 }
167
168 fn cancel(
169 &mut self,
170 _params: blob_stream::CancelParams,
171 _results: blob_stream::CancelResults,
172 ) -> capnp::capability::Promise<(), capnp::Error> {
173 capnp::capability::Promise::ok(())
175 }
176}
177
178struct BlobSinkImpl {
180 vfs: Arc<VfsRouter>,
181 path: PathBuf,
182 data: Mutex<Vec<u8>>,
183 aborted: Mutex<bool>,
184}
185
186impl BlobSinkImpl {
187 fn new(vfs: Arc<VfsRouter>, path: PathBuf) -> Self {
188 Self {
189 vfs,
190 path,
191 data: Mutex::new(Vec::new()),
192 aborted: Mutex::new(false),
193 }
194 }
195}
196
197impl blob_sink::Server for BlobSinkImpl {
198 fn write(
199 &mut self,
200 params: blob_sink::WriteParams,
201 _results: blob_sink::WriteResults,
202 ) -> capnp::capability::Promise<(), capnp::Error> {
203 let chunk = match params.get() {
204 Ok(p) => match p.get_data() {
205 Ok(d) => d.to_vec(),
206 Err(e) => return capnp::capability::Promise::err(e),
207 },
208 Err(e) => return capnp::capability::Promise::err(e),
209 };
210
211 tokio::task::block_in_place(|| {
212 let aborted = self.aborted.blocking_lock();
213 if *aborted {
214 return;
215 }
216 drop(aborted);
217
218 let mut data = self.data.blocking_lock();
219 data.extend(chunk);
220 });
221
222 capnp::capability::Promise::ok(())
223 }
224
225 fn finish(
226 &mut self,
227 _params: blob_sink::FinishParams,
228 mut results: blob_sink::FinishResults,
229 ) -> capnp::capability::Promise<(), capnp::Error> {
230 let vfs = self.vfs.clone();
231 let path = self.path.clone();
232
233 let (data, hash) = tokio::task::block_in_place(|| {
235 let aborted = self.aborted.blocking_lock();
236 if *aborted {
237 return (Vec::new(), Vec::new());
238 }
239 drop(aborted);
240
241 let data = self.data.blocking_lock().clone();
242
243 use std::collections::hash_map::DefaultHasher;
245 use std::hash::{Hash, Hasher};
246 let mut hasher = DefaultHasher::new();
247 data.hash(&mut hasher);
248 let hash_value = hasher.finish();
249 let hash = hash_value.to_be_bytes().to_vec();
250
251 (data, hash)
252 });
253
254 capnp::capability::Promise::from_future(async move {
255 let parent = path.parent().unwrap_or(Path::new("/v/blobs"));
257 if let Err(e) = vfs.mkdir(parent).await {
258 if e.kind() != std::io::ErrorKind::AlreadyExists {
260 tracing::warn!("Failed to create blob directory: {}", e);
261 }
262 }
263
264 vfs.write(&path, &data).await.map_err(|e| {
266 capnp::Error::failed(format!("failed to write blob: {}", e))
267 })?;
268
269 results.get().set_hash(&hash);
270 Ok(())
271 })
272 }
273
274 fn abort(
275 &mut self,
276 _params: blob_sink::AbortParams,
277 _results: blob_sink::AbortResults,
278 ) -> capnp::capability::Promise<(), capnp::Error> {
279 tokio::task::block_in_place(|| {
280 let mut aborted = self.aborted.blocking_lock();
281 *aborted = true;
282 });
283 capnp::capability::Promise::ok(())
284 }
285}
286
287fn generate_blob_id() -> String {
289 use std::time::{SystemTime, UNIX_EPOCH};
290 use std::sync::atomic::{AtomicU64, Ordering};
291
292 static COUNTER: AtomicU64 = AtomicU64::new(0);
293
294 let timestamp = SystemTime::now()
295 .duration_since(UNIX_EPOCH)
296 .map(|d| d.as_nanos())
297 .unwrap_or(0);
298 let count = COUNTER.fetch_add(1, Ordering::SeqCst);
299
300 format!("{:x}-{:x}", timestamp, count)
301}
302
303struct KernelImpl {
305 kernel: Arc<Kernel>,
306}
307
308impl KernelImpl {
309 fn new(kernel: Arc<Kernel>) -> Self {
310 Self { kernel }
311 }
312}
313
314impl kernel::Server for KernelImpl {
315 fn execute(
317 &mut self,
318 params: kernel::ExecuteParams,
319 mut results: kernel::ExecuteResults,
320 ) -> capnp::capability::Promise<(), capnp::Error> {
321 let kernel = self.kernel.clone();
322
323 let input = match params.get() {
324 Ok(p) => match p.get_input() {
325 Ok(s) => match s.to_str() {
326 Ok(s) => s.to_string(),
327 Err(e) => return capnp::capability::Promise::err(capnp::Error::failed(format!("invalid utf8: {}", e))),
328 },
329 Err(e) => return capnp::capability::Promise::err(e),
330 },
331 Err(e) => return capnp::capability::Promise::err(e),
332 };
333
334 capnp::capability::Promise::from_future(async move {
335 let exec_result = kernel.execute(&input).await.map_err(|e| {
336 capnp::Error::failed(format!("execution error: {}", e))
337 })?;
338
339 let mut result_builder = results.get().init_result();
341 result_builder.set_code(exec_result.code as i32);
342 result_builder.set_ok(exec_result.ok());
343 result_builder.set_err(&exec_result.err);
344 result_builder.set_stdout(exec_result.out.as_bytes());
345 result_builder.set_stderr(b"");
346
347 if let Some(data) = &exec_result.data {
349 set_value(&mut result_builder.reborrow().init_data(), data);
350 }
351
352 if let Some(ref output) = exec_result.output {
354 set_output_data(&mut result_builder.reborrow().init_output(), output);
355 }
356
357 Ok(())
358 })
359 }
360
361 fn get_var(
363 &mut self,
364 params: kernel::GetVarParams,
365 mut results: kernel::GetVarResults,
366 ) -> capnp::capability::Promise<(), capnp::Error> {
367 let kernel = self.kernel.clone();
368
369 let name = match params.get() {
370 Ok(p) => match p.get_name() {
371 Ok(s) => match s.to_str() {
372 Ok(s) => s.to_string(),
373 Err(e) => return capnp::capability::Promise::err(capnp::Error::failed(format!("invalid utf8: {}", e))),
374 },
375 Err(e) => return capnp::capability::Promise::err(e),
376 },
377 Err(e) => return capnp::capability::Promise::err(e),
378 };
379
380 capnp::capability::Promise::from_future(async move {
381 if let Some(value) = kernel.get_var(&name).await {
382 set_value(&mut results.get().init_value(), &value);
383 }
384
385 Ok(())
386 })
387 }
388
389 fn set_var(
391 &mut self,
392 params: kernel::SetVarParams,
393 _results: kernel::SetVarResults,
394 ) -> capnp::capability::Promise<(), capnp::Error> {
395 let kernel = self.kernel.clone();
396
397 let (name, value) = match params.get() {
398 Ok(p) => {
399 let name = match p.get_name() {
400 Ok(s) => match s.to_str() {
401 Ok(s) => s.to_string(),
402 Err(e) => return capnp::capability::Promise::err(capnp::Error::failed(format!("invalid utf8: {}", e))),
403 },
404 Err(e) => return capnp::capability::Promise::err(e),
405 };
406 let value_reader = match p.get_value() {
407 Ok(v) => v,
408 Err(e) => return capnp::capability::Promise::err(e),
409 };
410 let value = match read_value(&value_reader) {
411 Ok(v) => v,
412 Err(e) => return capnp::capability::Promise::err(e),
413 };
414 (name, value)
415 }
416 Err(e) => return capnp::capability::Promise::err(e),
417 };
418
419 capnp::capability::Promise::from_future(async move {
420 kernel.set_var(&name, value).await;
421 Ok(())
422 })
423 }
424
425 fn list_vars(
427 &mut self,
428 _params: kernel::ListVarsParams,
429 mut results: kernel::ListVarsResults,
430 ) -> capnp::capability::Promise<(), capnp::Error> {
431 let kernel = self.kernel.clone();
432
433 capnp::capability::Promise::from_future(async move {
434 let vars = kernel.list_vars().await;
435
436 let mut list = results.get().init_vars(vars.len() as u32);
437 for (i, (name, value)) in vars.into_iter().enumerate() {
438 let mut entry = list.reborrow().get(i as u32);
439 entry.set_key(&name);
440 set_value(&mut entry.init_value(), &value);
441 }
442
443 Ok(())
444 })
445 }
446
447 fn list_tools(
449 &mut self,
450 _params: kernel::ListToolsParams,
451 mut results: kernel::ListToolsResults,
452 ) -> capnp::capability::Promise<(), capnp::Error> {
453 let schemas = self.kernel.tool_schemas();
454
455 let mut list = results.get().init_tools(schemas.len() as u32);
456 for (i, schema) in schemas.into_iter().enumerate() {
457 let mut entry = list.reborrow().get(i as u32);
458 entry.set_name(&schema.name);
459 entry.set_description(&schema.description);
460 }
462
463 capnp::capability::Promise::ok(())
464 }
465
466 fn ping(
468 &mut self,
469 _params: kernel::PingParams,
470 mut results: kernel::PingResults,
471 ) -> capnp::capability::Promise<(), capnp::Error> {
472 results.get().set_pong("pong");
473 capnp::capability::Promise::ok(())
474 }
475
476 fn shutdown(
482 &mut self,
483 _params: kernel::ShutdownParams,
484 _results: kernel::ShutdownResults,
485 ) -> capnp::capability::Promise<(), capnp::Error> {
486 tracing::info!("Kernel shutdown requested via RPC");
487 capnp::capability::Promise::ok(())
488 }
489
490 fn reset(
492 &mut self,
493 _params: kernel::ResetParams,
494 _results: kernel::ResetResults,
495 ) -> capnp::capability::Promise<(), capnp::Error> {
496 let kernel = self.kernel.clone();
497
498 capnp::capability::Promise::from_future(async move {
499 kernel.reset().await.map_err(|e| {
500 capnp::Error::failed(format!("reset error: {}", e))
501 })?;
502 Ok(())
503 })
504 }
505
506 fn get_cwd(
510 &mut self,
511 _params: kernel::GetCwdParams,
512 mut results: kernel::GetCwdResults,
513 ) -> capnp::capability::Promise<(), capnp::Error> {
514 let kernel = self.kernel.clone();
515
516 capnp::capability::Promise::from_future(async move {
517 let cwd = kernel.cwd().await;
518 results.get().set_path(cwd.to_string_lossy());
519 Ok(())
520 })
521 }
522
523 fn set_cwd(
525 &mut self,
526 params: kernel::SetCwdParams,
527 mut results: kernel::SetCwdResults,
528 ) -> capnp::capability::Promise<(), capnp::Error> {
529 let kernel = self.kernel.clone();
530
531 let path = match params.get() {
532 Ok(p) => match p.get_path() {
533 Ok(s) => match s.to_str() {
534 Ok(s) => s.to_string(),
535 Err(e) => return capnp::capability::Promise::err(capnp::Error::failed(format!("invalid utf8: {}", e))),
536 },
537 Err(e) => return capnp::capability::Promise::err(e),
538 },
539 Err(e) => return capnp::capability::Promise::err(e),
540 };
541
542 capnp::capability::Promise::from_future(async move {
543 kernel.set_cwd(std::path::PathBuf::from(&path)).await;
544 let mut r = results.get();
545 r.set_success(true);
546 r.set_error("");
547 Ok(())
548 })
549 }
550
551 fn get_last_result(
555 &mut self,
556 _params: kernel::GetLastResultParams,
557 mut results: kernel::GetLastResultResults,
558 ) -> capnp::capability::Promise<(), capnp::Error> {
559 let kernel = self.kernel.clone();
560
561 capnp::capability::Promise::from_future(async move {
562 let exec_result = kernel.last_result().await;
563
564 let mut result_builder = results.get().init_result();
565 result_builder.set_code(exec_result.code as i32);
566 result_builder.set_ok(exec_result.ok());
567 result_builder.set_err(&exec_result.err);
568 result_builder.set_stdout(exec_result.out.as_bytes());
569 result_builder.set_stderr(b"");
570
571 if let Some(data) = &exec_result.data {
573 set_value(&mut result_builder.reborrow().init_data(), data);
574 }
575
576 if let Some(ref output) = exec_result.output {
578 set_output_data(&mut result_builder.reborrow().init_output(), output);
579 }
580
581 Ok(())
582 })
583 }
584
585 fn execute_streaming(
588 &mut self,
589 _params: kernel::ExecuteStreamingParams,
590 _results: kernel::ExecuteStreamingResults,
591 ) -> capnp::capability::Promise<(), capnp::Error> {
592 capnp::capability::Promise::err(capnp::Error::unimplemented(
593 "execute_streaming not yet implemented".into(),
594 ))
595 }
596
597 fn call_tool(
598 &mut self,
599 _params: kernel::CallToolParams,
600 _results: kernel::CallToolResults,
601 ) -> capnp::capability::Promise<(), capnp::Error> {
602 capnp::capability::Promise::err(capnp::Error::unimplemented(
603 "call_tool not yet implemented".into(),
604 ))
605 }
606
607 fn mount(
608 &mut self,
609 _params: kernel::MountParams,
610 _results: kernel::MountResults,
611 ) -> capnp::capability::Promise<(), capnp::Error> {
612 capnp::capability::Promise::err(capnp::Error::unimplemented(
613 "mount not yet implemented".into(),
614 ))
615 }
616
617 fn unmount(
618 &mut self,
619 _params: kernel::UnmountParams,
620 _results: kernel::UnmountResults,
621 ) -> capnp::capability::Promise<(), capnp::Error> {
622 capnp::capability::Promise::err(capnp::Error::unimplemented(
623 "unmount not yet implemented".into(),
624 ))
625 }
626
627 fn list_mounts(
628 &mut self,
629 _params: kernel::ListMountsParams,
630 _results: kernel::ListMountsResults,
631 ) -> capnp::capability::Promise<(), capnp::Error> {
632 capnp::capability::Promise::err(capnp::Error::unimplemented(
633 "list_mounts not yet implemented".into(),
634 ))
635 }
636
637 fn register_mcp(
638 &mut self,
639 _params: kernel::RegisterMcpParams,
640 _results: kernel::RegisterMcpResults,
641 ) -> capnp::capability::Promise<(), capnp::Error> {
642 capnp::capability::Promise::err(capnp::Error::unimplemented(
643 "register_mcp not yet implemented".into(),
644 ))
645 }
646
647 fn unregister_mcp(
648 &mut self,
649 _params: kernel::UnregisterMcpParams,
650 _results: kernel::UnregisterMcpResults,
651 ) -> capnp::capability::Promise<(), capnp::Error> {
652 capnp::capability::Promise::err(capnp::Error::unimplemented(
653 "unregister_mcp not yet implemented".into(),
654 ))
655 }
656
657 fn list_mcp_servers(
658 &mut self,
659 _params: kernel::ListMcpServersParams,
660 _results: kernel::ListMcpServersResults,
661 ) -> capnp::capability::Promise<(), capnp::Error> {
662 capnp::capability::Promise::err(capnp::Error::unimplemented(
663 "list_mcp_servers not yet implemented".into(),
664 ))
665 }
666
667 fn snapshot(
668 &mut self,
669 _params: kernel::SnapshotParams,
670 _results: kernel::SnapshotResults,
671 ) -> capnp::capability::Promise<(), capnp::Error> {
672 capnp::capability::Promise::err(capnp::Error::unimplemented(
673 "snapshot not yet implemented".into(),
674 ))
675 }
676
677 fn restore(
678 &mut self,
679 _params: kernel::RestoreParams,
680 _results: kernel::RestoreResults,
681 ) -> capnp::capability::Promise<(), capnp::Error> {
682 capnp::capability::Promise::err(capnp::Error::unimplemented(
683 "restore not yet implemented".into(),
684 ))
685 }
686
687 fn read_blob(
688 &mut self,
689 params: kernel::ReadBlobParams,
690 mut results: kernel::ReadBlobResults,
691 ) -> capnp::capability::Promise<(), capnp::Error> {
692 let vfs = self.kernel.vfs();
693
694 let id = match params.get() {
695 Ok(p) => match p.get_id() {
696 Ok(s) => match s.to_str() {
697 Ok(s) => s.to_string(),
698 Err(e) => return capnp::capability::Promise::err(capnp::Error::failed(format!("invalid utf8: {}", e))),
699 },
700 Err(e) => return capnp::capability::Promise::err(e),
701 },
702 Err(e) => return capnp::capability::Promise::err(e),
703 };
704
705 capnp::capability::Promise::from_future(async move {
706 let path = PathBuf::from(format!("/v/blobs/{}", id));
707
708 let data = vfs.read(&path).await.map_err(|e| {
710 capnp::Error::failed(format!("failed to read blob {}: {}", id, e))
711 })?;
712
713 let stream_impl = BlobStreamImpl::new(data);
715 let stream_client: blob_stream::Client = capnp_rpc::new_client(stream_impl);
716
717 results.get().set_stream(stream_client);
718 Ok(())
719 })
720 }
721
722 fn write_blob(
723 &mut self,
724 params: kernel::WriteBlobParams,
725 mut results: kernel::WriteBlobResults,
726 ) -> capnp::capability::Promise<(), capnp::Error> {
727 let vfs = self.kernel.vfs();
728
729 let (content_type, _size) = match params.get() {
730 Ok(p) => {
731 let ct = match p.get_content_type() {
732 Ok(s) => match s.to_str() {
733 Ok(s) => s.to_string(),
734 Err(e) => return capnp::capability::Promise::err(capnp::Error::failed(format!("invalid utf8: {}", e))),
735 },
736 Err(e) => return capnp::capability::Promise::err(e),
737 };
738 let size = p.get_size();
739 (ct, size)
740 },
741 Err(e) => return capnp::capability::Promise::err(e),
742 };
743
744 let id = generate_blob_id();
746 let path = PathBuf::from(format!("/v/blobs/{}", id));
747
748 tracing::debug!("Creating blob {} with content type {}", id, content_type);
750
751 let sink_impl = BlobSinkImpl::new(vfs, path);
753 let sink_client: blob_sink::Client = capnp_rpc::new_client(sink_impl);
754
755 let mut builder = results.get();
756 builder.set_id(&id);
757 builder.set_stream(sink_client);
758
759 capnp::capability::Promise::ok(())
760 }
761
762 fn delete_blob(
763 &mut self,
764 params: kernel::DeleteBlobParams,
765 mut results: kernel::DeleteBlobResults,
766 ) -> capnp::capability::Promise<(), capnp::Error> {
767 let vfs = self.kernel.vfs();
768
769 let id = match params.get() {
770 Ok(p) => match p.get_id() {
771 Ok(s) => match s.to_str() {
772 Ok(s) => s.to_string(),
773 Err(e) => return capnp::capability::Promise::err(capnp::Error::failed(format!("invalid utf8: {}", e))),
774 },
775 Err(e) => return capnp::capability::Promise::err(e),
776 },
777 Err(e) => return capnp::capability::Promise::err(e),
778 };
779
780 capnp::capability::Promise::from_future(async move {
781 let path = PathBuf::from(format!("/v/blobs/{}", id));
782
783 let success = match vfs.remove(&path).await {
785 Ok(()) => true,
786 Err(e) => {
787 tracing::warn!("Failed to delete blob {}: {}", id, e);
788 false
789 }
790 };
791
792 results.get().set_success(success);
793 Ok(())
794 })
795 }
796}
797
798use kaish_schema::value;
803use kaish_schema::{output_data, output_node};
804use crate::ast::Value;
805use crate::interpreter::{EntryType, OutputData, OutputNode};
806
807fn set_value(builder: &mut value::Builder<'_>, value: &Value) {
809 match value {
810 Value::Null => builder.set_null(()),
811 Value::Bool(b) => builder.set_bool(*b),
812 Value::Int(i) => builder.set_int(*i),
813 Value::Float(f) => builder.set_float(*f),
814 Value::String(s) => builder.set_string(s),
815 Value::Json(json) => {
816 set_json_value(builder.reborrow(), json);
818 }
819 Value::Blob(blob) => {
820 let mut blob_builder = builder.reborrow().init_blob();
821 blob_builder.set_id(&blob.id);
822 blob_builder.set_size(blob.size);
823 blob_builder.set_content_type(&blob.content_type);
824 if let Some(hash) = &blob.hash {
825 blob_builder.set_hash(hash);
826 }
827 }
828 }
829}
830
831fn set_json_value(mut builder: value::Builder<'_>, json: &serde_json::Value) {
833 match json {
834 serde_json::Value::Null => builder.set_null(()),
835 serde_json::Value::Bool(b) => builder.set_bool(*b),
836 serde_json::Value::Number(n) => {
837 if let Some(i) = n.as_i64() {
838 builder.set_int(i);
839 } else if let Some(f) = n.as_f64() {
840 builder.set_float(f);
841 } else {
842 builder.set_string(n.to_string());
843 }
844 }
845 serde_json::Value::String(s) => builder.set_string(s),
846 serde_json::Value::Array(arr) => {
847 let mut array_builder = builder.init_array(arr.len() as u32);
848 for (i, item) in arr.iter().enumerate() {
849 set_json_value(array_builder.reborrow().get(i as u32), item);
850 }
851 }
852 serde_json::Value::Object(obj) => {
853 let mut object_builder = builder.init_object(obj.len() as u32);
854 for (i, (key, val)) in obj.iter().enumerate() {
855 let mut entry = object_builder.reborrow().get(i as u32);
856 entry.set_key(key);
857 set_json_value(entry.init_value(), val);
858 }
859 }
860 }
861}
862
863fn set_output_data(builder: &mut output_data::Builder<'_>, output: &OutputData) {
865 if let Some(ref headers) = output.headers {
867 let mut headers_builder = builder.reborrow().init_headers(headers.len() as u32);
868 for (i, header) in headers.iter().enumerate() {
869 headers_builder.set(i as u32, header);
870 }
871 }
872
873 let mut root_builder = builder.reborrow().init_root(output.root.len() as u32);
875 for (i, node) in output.root.iter().enumerate() {
876 let mut node_builder = root_builder.reborrow().get(i as u32);
877 set_output_node(&mut node_builder, node);
878 }
879}
880
881fn set_output_node(builder: &mut output_node::Builder<'_>, node: &OutputNode) {
883 builder.set_name(&node.name);
884
885 use kaish_schema::kaish_capnp::EntryType as SchemaEntryType;
887 let entry_type = match node.entry_type {
888 EntryType::Text => SchemaEntryType::Text,
889 EntryType::File => SchemaEntryType::File,
890 EntryType::Directory => SchemaEntryType::Directory,
891 EntryType::Executable => SchemaEntryType::Executable,
892 EntryType::Symlink => SchemaEntryType::Symlink,
893 };
894 builder.set_entry_type(entry_type);
895
896 if let Some(ref text) = node.text {
898 builder.set_text(text);
899 }
900
901 if !node.cells.is_empty() {
903 let mut cells_builder = builder.reborrow().init_cells(node.cells.len() as u32);
904 for (i, cell) in node.cells.iter().enumerate() {
905 cells_builder.set(i as u32, cell);
906 }
907 }
908
909 if !node.children.is_empty() {
911 let mut children_builder = builder.reborrow().init_children(node.children.len() as u32);
912 for (i, child) in node.children.iter().enumerate() {
913 let mut child_builder = children_builder.reborrow().get(i as u32);
914 set_output_node(&mut child_builder, child);
915 }
916 }
917}
918
919fn read_value(reader: &value::Reader<'_>) -> Result<Value, capnp::Error> {
923 use value::Which;
924 match reader.which()? {
925 Which::Null(()) => Ok(Value::Null),
926 Which::Bool(b) => Ok(Value::Bool(b)),
927 Which::Int(i) => Ok(Value::Int(i)),
928 Which::Float(f) => Ok(Value::Float(f)),
929 Which::String(s) => {
930 let text = s?;
931 let string = text.to_str().map_err(|e| capnp::Error::failed(format!("invalid utf8: {}", e)))?;
932 Ok(Value::String(string.to_string()))
933 }
934 Which::Array(arr) => {
935 let arr = arr?;
937 let items: Result<Vec<_>, _> = arr.iter().map(|v| read_value_to_json(&v)).collect();
938 let json_array = serde_json::Value::Array(items?);
939 Ok(Value::String(json_array.to_string()))
940 }
941 Which::Object(obj) => {
942 let obj = obj?;
944 let mut map = serde_json::Map::new();
945 for kv in obj.iter() {
946 let key_text = kv.get_key()?;
947 let key = key_text.to_str().map_err(|e| capnp::Error::failed(format!("invalid utf8: {}", e)))?.to_string();
948 let val = read_value_to_json(&kv.get_value()?)?;
949 map.insert(key, val);
950 }
951 Ok(Value::String(serde_json::Value::Object(map).to_string()))
952 }
953 Which::Blob(blob) => {
954 let blob = blob?;
956 let id = blob.get_id()?.to_str().map_err(|e| capnp::Error::failed(format!("invalid utf8: {}", e)))?;
957 let size = blob.get_size();
958 let content_type = blob.get_content_type()?.to_str().map_err(|e| capnp::Error::failed(format!("invalid utf8: {}", e)))?;
959 let hash = blob.get_hash()?;
960
961 let mut map = serde_json::Map::new();
962 map.insert("_type".to_string(), serde_json::Value::String("blob".to_string()));
963 map.insert("id".to_string(), serde_json::Value::String(id.to_string()));
964 map.insert("size".to_string(), serde_json::Value::Number(size.into()));
965 map.insert("contentType".to_string(), serde_json::Value::String(content_type.to_string()));
966 if !hash.is_empty() {
967 let hash_hex: String = hash.iter().map(|b| format!("{:02x}", b)).collect();
968 map.insert("hash".to_string(), serde_json::Value::String(hash_hex));
969 }
970 Ok(Value::String(serde_json::Value::Object(map).to_string()))
971 }
972 }
973}
974
975fn read_value_to_json(reader: &value::Reader<'_>) -> Result<serde_json::Value, capnp::Error> {
977 use value::Which;
978 match reader.which()? {
979 Which::Null(()) => Ok(serde_json::Value::Null),
980 Which::Bool(b) => Ok(serde_json::Value::Bool(b)),
981 Which::Int(i) => Ok(serde_json::Value::Number(i.into())),
982 Which::Float(f) => Ok(serde_json::Number::from_f64(f)
983 .map(serde_json::Value::Number)
984 .unwrap_or(serde_json::Value::Null)),
985 Which::String(s) => {
986 let text = s?;
987 let string = text.to_str().map_err(|e| capnp::Error::failed(format!("invalid utf8: {}", e)))?;
988 Ok(serde_json::Value::String(string.to_string()))
989 }
990 Which::Array(arr) => {
991 let arr = arr?;
992 let items: Result<Vec<_>, _> = arr.iter().map(|v| read_value_to_json(&v)).collect();
993 Ok(serde_json::Value::Array(items?))
994 }
995 Which::Object(obj) => {
996 let obj = obj?;
997 let mut map = serde_json::Map::new();
998 for kv in obj.iter() {
999 let key_text = kv.get_key()?;
1000 let key = key_text.to_str().map_err(|e| capnp::Error::failed(format!("invalid utf8: {}", e)))?.to_string();
1001 let val = read_value_to_json(&kv.get_value()?)?;
1002 map.insert(key, val);
1003 }
1004 Ok(serde_json::Value::Object(map))
1005 }
1006 Which::Blob(blob) => {
1007 let blob = blob?;
1009 let id = blob.get_id()?.to_str().map_err(|e| capnp::Error::failed(format!("invalid utf8: {}", e)))?;
1010 let size = blob.get_size();
1011 let content_type = blob.get_content_type()?.to_str().map_err(|e| capnp::Error::failed(format!("invalid utf8: {}", e)))?;
1012 let hash = blob.get_hash()?;
1013
1014 let mut map = serde_json::Map::new();
1015 map.insert("_type".to_string(), serde_json::Value::String("blob".to_string()));
1016 map.insert("id".to_string(), serde_json::Value::String(id.to_string()));
1017 map.insert("size".to_string(), serde_json::Value::Number(size.into()));
1018 map.insert("contentType".to_string(), serde_json::Value::String(content_type.to_string()));
1019 if !hash.is_empty() {
1020 let hash_hex: String = hash.iter().map(|b| format!("{:02x}", b)).collect();
1021 map.insert("hash".to_string(), serde_json::Value::String(hash_hex));
1022 }
1023 Ok(serde_json::Value::Object(map))
1024 }
1025 }
1026}