1use std::{future::ready, sync::Arc};
2
3use selium_abi::hostcalls::Hostcall;
4use selium_abi::{GuestUint, IoFrame, IoRead, IoWrite};
5use wasmtime::Caller;
6
7use crate::{
8 guest_data::{GuestError, GuestResult},
9 operation::{Contract, Operation},
10 registry::{InstanceRegistry, ResourceHandle, ResourceType},
11};
12
13pub trait IoCapability {
15 type Handle: Send;
16 type Reader: Send + Unpin;
17 type Writer: Send + Unpin;
18 type Error: Into<GuestError>;
19
20 fn new_writer(&self, handle: &Self::Handle) -> Result<Self::Writer, Self::Error>;
22
23 fn new_reader(&self, handle: &Self::Handle) -> Result<Self::Reader, Self::Error>;
25
26 fn read(
28 &self,
29 reader: &mut Self::Reader,
30 len: usize,
31 ) -> impl Future<Output = Result<IoFrame, Self::Error>> + Send;
32
33 fn write(
35 &self,
36 writer: &mut Self::Writer,
37 bytes: &[u8],
38 ) -> impl Future<Output = Result<(), Self::Error>> + Send;
39}
40
41pub struct IoCreateReaderDriver<Impl>(Impl);
42pub struct IoReadDriver<Impl>(Impl);
43pub struct IoCreateWriterDriver<Impl>(Impl);
44pub struct IoWriteDriver<Impl>(Impl);
45
46impl<T> IoCapability for Arc<T>
47where
48 T: IoCapability,
49{
50 type Handle = T::Handle;
51 type Reader = T::Reader;
52 type Writer = T::Writer;
53 type Error = T::Error;
54
55 fn new_reader(&self, handle: &Self::Handle) -> Result<Self::Reader, Self::Error> {
56 self.as_ref().new_reader(handle)
57 }
58
59 fn new_writer(&self, handle: &Self::Handle) -> Result<Self::Writer, Self::Error> {
60 self.as_ref().new_writer(handle)
61 }
62
63 fn read(
64 &self,
65 reader: &mut Self::Reader,
66 len: usize,
67 ) -> impl Future<Output = Result<IoFrame, Self::Error>> {
68 self.as_ref().read(reader, len)
69 }
70
71 fn write(
72 &self,
73 writer: &mut Self::Writer,
74 bytes: &[u8],
75 ) -> impl Future<Output = Result<(), Self::Error>> {
76 self.as_ref().write(writer, bytes)
77 }
78}
79
80impl<Impl> Contract for IoCreateReaderDriver<Impl>
81where
82 Impl: IoCapability + Clone + Send + 'static,
83{
84 type Input = GuestUint;
85 type Output = GuestUint;
86
87 fn to_future(
88 &self,
89 caller: &mut Caller<'_, InstanceRegistry>,
90 input: Self::Input,
91 ) -> impl Future<Output = GuestResult<Self::Output>> + 'static {
92 let this = self.0.clone();
93 let idx = caller
94 .data()
95 .entry(input as usize)
96 .ok_or(GuestError::NotFound);
97 let registry = caller.data().registry_arc();
98
99 let result = (|| -> GuestResult<GuestUint> {
100 let idx = idx?;
101 let reader = registry
102 .with(ResourceHandle::<Impl::Handle>::new(idx), move |handle| {
103 this.new_reader(handle)
104 })
105 .expect("Invalid resource id from InstanceRegistry")
106 .map_err(Into::into)?;
107
108 let slot = caller
109 .data_mut()
110 .insert(reader, None, ResourceType::Reader)
111 .map_err(GuestError::from)?;
112 if let Some(resource_id) = caller.data().entry(slot) {
113 registry.record_parent(resource_id, idx);
114 }
115 GuestUint::try_from(slot).map_err(|_| GuestError::InvalidArgument)
116 })();
117
118 ready(result)
119 }
120}
121
122impl<Impl> Contract for IoReadDriver<Impl>
123where
124 Impl: IoCapability + Clone + Send + 'static,
125{
126 type Input = IoRead;
127 type Output = IoFrame;
128
129 fn to_future(
130 &self,
131 caller: &mut Caller<'_, InstanceRegistry>,
132 input: Self::Input,
133 ) -> impl Future<Output = GuestResult<Self::Output>> + 'static {
134 let this = self.0.clone();
135 let idx = caller
136 .data()
137 .entry(input.handle as usize)
138 .ok_or(GuestError::NotFound);
139 let registry = caller.data().registry_arc();
140 let len = input.len as usize;
141
142 async move {
143 registry
144 .with_async(ResourceHandle::<Impl::Reader>::new(idx?), move |reader| {
145 Box::pin(async move { this.read(reader, len).await })
146 })
147 .await
148 .expect("Invalid resource id from InstanceRegistry")
149 .map_err(Into::into)
150 }
151 }
152}
153
154impl<Impl> Contract for IoCreateWriterDriver<Impl>
155where
156 Impl: IoCapability + Clone + Send + 'static,
157{
158 type Input = GuestUint;
159 type Output = GuestUint;
160
161 fn to_future(
162 &self,
163 caller: &mut Caller<'_, InstanceRegistry>,
164 input: Self::Input,
165 ) -> impl Future<Output = GuestResult<Self::Output>> + 'static {
166 let this = self.0.clone();
167 let idx = caller
168 .data()
169 .entry(input as usize)
170 .ok_or(GuestError::NotFound);
171 let registry = caller.data().registry_arc();
172
173 let result = (|| -> GuestResult<GuestUint> {
174 let idx = idx?;
175 let writer = registry
176 .with(ResourceHandle::<Impl::Handle>::new(idx), move |channel| {
177 this.new_writer(channel)
178 })
179 .expect("Invalid resource id from InstanceRegistry")
180 .map_err(Into::into)?;
181
182 let slot = caller
183 .data_mut()
184 .insert(writer, None, ResourceType::Writer)
185 .map_err(GuestError::from)?;
186 if let Some(resource_id) = caller.data().entry(slot) {
187 registry.record_parent(resource_id, idx);
188 }
189 GuestUint::try_from(slot).map_err(|_| GuestError::InvalidArgument)
190 })();
191
192 ready(result)
193 }
194}
195
196impl<Impl> Contract for IoWriteDriver<Impl>
197where
198 Impl: IoCapability + Clone + Send + 'static,
199{
200 type Input = IoWrite;
201 type Output = GuestUint;
202
203 fn to_future(
204 &self,
205 caller: &mut Caller<'_, InstanceRegistry>,
206 input: Self::Input,
207 ) -> impl Future<Output = GuestResult<Self::Output>> + 'static {
208 let this = self.0.clone();
209 let payload = input.payload;
210 let idx = caller
211 .data()
212 .entry(input.handle as usize)
213 .ok_or(GuestError::NotFound);
214 let registry = caller.data().registry_arc();
215 let payload_len = payload.len();
216
217 async move {
218 registry
219 .with_async(ResourceHandle::<Impl::Writer>::new(idx?), move |writer| {
220 Box::pin(async move { this.write(writer, &payload).await })
221 })
222 .await
223 .expect("Invalid resource id from InstanceRegistry")
224 .map_err(Into::into)?;
225
226 let count =
227 GuestUint::try_from(payload_len).map_err(|_| GuestError::InvalidArgument)?;
228 Ok(count)
229 }
230 }
231}
232
233pub fn create_reader_op<C>(
234 cap: C,
235 hostcall: &'static Hostcall<GuestUint, GuestUint>,
236) -> Arc<Operation<IoCreateReaderDriver<C>>>
237where
238 C: IoCapability + Clone + Send + 'static,
239{
240 Operation::from_hostcall(IoCreateReaderDriver(cap), hostcall)
241}
242
243pub fn read_op<C>(
244 cap: C,
245 hostcall: &'static Hostcall<IoRead, IoFrame>,
246) -> Arc<Operation<IoReadDriver<C>>>
247where
248 C: IoCapability + Clone + Send + 'static,
249{
250 Operation::from_hostcall(IoReadDriver(cap), hostcall)
251}
252
253pub fn create_writer_op<C>(
254 cap: C,
255 hostcall: &'static Hostcall<GuestUint, GuestUint>,
256) -> Arc<Operation<IoCreateWriterDriver<C>>>
257where
258 C: IoCapability + Clone + Send + 'static,
259{
260 Operation::from_hostcall(IoCreateWriterDriver(cap), hostcall)
261}
262
263pub fn write_op<C>(
264 cap: C,
265 hostcall: &'static Hostcall<IoWrite, GuestUint>,
266) -> Arc<Operation<IoWriteDriver<C>>>
267where
268 C: IoCapability + Clone + Send + 'static,
269{
270 Operation::from_hostcall(IoWriteDriver(cap), hostcall)
271}