1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
use std::cell::RefCell;
use std::sync::Arc;

use wasmrs::flux::*;
use wasmrs::runtime::{spawn, UnboundedReceiver};
use wasmrs::{Frame, Payload, PayloadError, RSocket, WasmSocket};

use crate::context::{EngineProvider, SharedContext};

type Result<T> = std::result::Result<T, crate::errors::Error>;

#[must_use]
#[allow(missing_debug_implementations)]
pub struct Host {
  engine: RefCell<Box<dyn EngineProvider>>,
  mtu: usize,
}

impl Host {
  pub fn new<T: EngineProvider + 'static>(engine: T) -> Result<Self> {
    let host = Host {
      engine: RefCell::new(Box::new(engine)),
      mtu: 256,
    };

    host.engine.borrow_mut().init()?;

    Ok(host)
  }

  pub fn new_context(&self) -> Result<CallContext> {
    let mut socket = WasmSocket::new(HostServer {}, wasmrs::SocketSide::Host);
    let rx = socket.take_rx().unwrap();
    let socket = Arc::new(socket);

    let context = self.engine.borrow().new_context(socket.clone())?;
    context.init()?;
    spawn_writer(rx, context.clone());

    CallContext::new(self.mtu, socket, context)
  }
}

fn spawn_writer(mut rx: UnboundedReceiver<Frame>, context: SharedContext) {
  spawn(async move {
    while let Some(frame) = rx.recv().await {
      let _ = context.write_frame(frame);
    }
  });
}

struct HostServer {}

impl RSocket for HostServer {
  fn fire_and_forget(&self, _req: Payload) -> Mono<(), PayloadError> {
    todo!()
  }

  fn request_response(&self, _payload: Payload) -> Mono<Payload, PayloadError> {
    todo!();
  }

  fn request_stream(&self, _req: Payload) -> FluxReceiver<Payload, PayloadError> {
    todo!()
  }

  fn request_channel(&self, _reqs: FluxReceiver<Payload, PayloadError>) -> FluxReceiver<Payload, PayloadError> {
    todo!()
  }
}

pub struct CallContext {
  socket: Arc<WasmSocket>,
  context: SharedContext,
}

impl std::fmt::Debug for CallContext {
  fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
    f.debug_struct("WasmRsCallContext")
      .field("state", &self.socket)
      .finish()
  }
}

impl CallContext {
  fn new(_mtu: usize, socket: Arc<WasmSocket>, context: SharedContext) -> Result<Self> {
    Ok(Self { socket, context })
  }

  pub fn get_import(&self, namespace: &str, operation: &str) -> Result<u32> {
    self.context.get_import(namespace, operation)
  }

  pub fn get_export(&self, namespace: &str, operation: &str) -> Result<u32> {
    self.context.get_export(namespace, operation)
  }

  pub fn dump_operations(&self) {
    println!("{:#?}", self.context.get_operation_list());
  }
}

impl RSocket for CallContext {
  fn fire_and_forget(&self, payload: Payload) -> Mono<(), PayloadError> {
    self.socket.fire_and_forget(payload)
  }

  fn request_response(&self, payload: Payload) -> Mono<Payload, PayloadError> {
    self.socket.request_response(payload)
  }

  fn request_stream(&self, payload: Payload) -> FluxReceiver<Payload, PayloadError> {
    self.socket.request_stream(payload)
  }

  fn request_channel(&self, stream: FluxReceiver<Payload, PayloadError>) -> FluxReceiver<Payload, PayloadError> {
    self.socket.request_channel(stream)
  }
}