embly_wrapper/
instance.rs

1use {
2    crate::{
3        bytes::u64_as_u8_le,
4        context::{next_message, write_msg, EmblyCtx},
5        error::{Error, Result},
6        protos::comms::Message,
7    },
8    log::debug,
9    lucet_runtime::{DlModule, Limits, MmapRegion, Module, Region},
10    lucet_runtime_internals::{instance::InstanceHandle, module::ModuleInternal},
11    lucet_wasi::WasiCtxBuilder,
12    std::{
13        io::prelude::*,
14        os::unix::net::UnixStream,
15        sync::{
16            atomic::{AtomicBool, Ordering},
17            mpsc::channel,
18            Arc, Mutex,
19        },
20        thread,
21    },
22};
23
24pub struct Instance {
25    pub inst: InstanceHandle,
26    parent_address: u64,
27    your_address: u64,
28    stream_closer: UnixStream,
29    running: Arc<Mutex<AtomicBool>>,
30}
31
32impl Instance {
33    pub fn new(
34        embly_module: String,
35        addr_string: String,
36        mut master_socket: UnixStream,
37    ) -> Result<Self> {
38        let running = Arc::new(Mutex::new(AtomicBool::new(true)));
39        let module = DlModule::load(&embly_module)?;
40        // TODO: support memory constraints
41        let min_globals_size = module.globals().len() * std::mem::size_of::<u64>();
42        let globals_size = ((min_globals_size + 4096 - 1) / 4096) * 4096;
43        let region = MmapRegion::create(
44            1,
45            &Limits {
46                globals_size,
47                heap_memory_size: 4_294_967_296,
48                stack_size: 8_388_608,
49                heap_address_space_size: 8_589_934_592,
50            },
51        )?;
52        // TODO: likely remove inherit_env
53        let ctx = WasiCtxBuilder::new()
54            .inherit_stdio()
55            .inherit_env()
56            .env("RUST_BACKTRACE", "1");
57        let socket_writer = master_socket.try_clone()?;
58        let stream_closer = master_socket.try_clone()?;
59        let (sender, receiver) = channel();
60        let addr = addr_string.parse::<u64>().unwrap();
61        master_socket.write_all(&u64_as_u8_le(addr))?;
62
63        let thread_running = running.clone();
64        thread::spawn(move || loop {
65            debug!("reading bytes");
66            if let Ok(msg) = next_message(&mut master_socket) {
67                // channel has an infinite buffer, so this is where our messages go
68                sender.send(msg).unwrap();
69            } else {
70                if thread_running.lock().unwrap().load(Ordering::Relaxed) {
71                    panic!("error reading from socket, no longer listening");
72                }
73                break;
74            }
75        });
76        let msg: Message = receiver.recv()?;
77        debug!("got first message {:?}", msg);
78        if msg.parent_address == 0 || msg.your_address == 0 {
79            println!("either parent address or your address values are zero");
80            return Err(Error::InvalidStartup(msg));
81        }
82        if msg.your_address != addr {
83            panic!("addr doesn't match {} {}", addr, msg.your_address)
84        }
85        let parent_address = msg.parent_address;
86        let your_address = msg.your_address;
87        let embly_ctx = EmblyCtx::new(receiver, socket_writer, your_address, parent_address);
88        let inst = region
89            .new_instance_builder(module as Arc<dyn Module>)
90            .with_embed_ctx(ctx.build().expect("WASI ctx can be created"))
91            .with_embed_ctx(embly_ctx)
92            .build()?;
93        Ok(Instance {
94            running,
95            inst,
96            parent_address,
97            your_address,
98            stream_closer,
99        })
100    }
101
102    pub fn send_exit_message(&mut self, exit_code: i32) -> Result<()> {
103        // inst.get_embed_ctx_mut()
104        let mut msg = Message::new();
105        msg.exit = exit_code; //todo: u32
106        msg.exiting = true;
107        msg.from = self.your_address;
108        msg.to = self.parent_address;
109        write_msg(&mut self.stream_closer, msg)?;
110        self.running.lock().unwrap().store(false, Ordering::Relaxed);
111        Ok(())
112    }
113}