spacetimedb/host/wasmtime/wasm_instance_env.rs
1#![allow(clippy::too_many_arguments)]
2
3use std::time::Instant;
4
5use crate::database_logger::{BacktraceFrame, BacktraceProvider, ModuleBacktrace, Record};
6use crate::host::instance_env::{ChunkPool, InstanceEnv};
7use crate::host::wasm_common::instrumentation;
8use crate::host::wasm_common::module_host_actor::ExecutionTimings;
9use crate::host::wasm_common::{
10 err_to_errno, instrumentation::CallTimes, AbiRuntimeError, RowIterIdx, RowIters, TimingSpan, TimingSpanIdx,
11 TimingSpanSet,
12};
13use crate::host::AbiCall;
14use anyhow::Context as _;
15use spacetimedb_lib::Timestamp;
16use spacetimedb_primitives::{errno, ColId};
17use wasmtime::{AsContext, Caller, StoreContextMut};
18
19use super::{Mem, MemView, NullableMemOp, WasmError, WasmPointee, WasmPtr};
20
21#[cfg(not(feature = "spacetimedb-wasm-instance-env-times"))]
22use instrumentation::noop as span;
23#[cfg(feature = "spacetimedb-wasm-instance-env-times")]
24use instrumentation::op as span;
25
26/// A `WasmInstanceEnv` provides the connection between a module
27/// and the database.
28///
29/// A `WasmInstanceEnv` associates an `InstanceEnv` (responsible for
30/// the database instance and its associated state) with a wasm
31/// `Mem`. It also contains the resources (`Buffers` and
32/// `BufferIters`) needed to manage the ABI contract between modules
33/// and the host.
34///
35/// Once created, a `WasmInstanceEnv` must be instantiated with a `Mem`
36/// exactly once.
37///
38/// Some of the state associated to a `WasmInstanceEnv` is per reducer invocation.
39/// For instance, module-defined timing spans are per reducer.
40pub(super) struct WasmInstanceEnv {
41 /// The database `InstanceEnv` associated to this instance.
42 instance_env: InstanceEnv,
43
44 /// The `Mem` associated to this instance. At construction time,
45 /// this is always `None`. The `Mem` instance is extracted from the
46 /// instance exports, and after instantiation is complete, this will
47 /// always be `Some`.
48 mem: Option<Mem>,
49
50 /// The arguments being passed to a reducer
51 /// that it can read via [`Self::bytes_source_read`].
52 call_reducer_args: Option<(bytes::Bytes, usize)>,
53
54 /// The standard sink used for [`Self::bytes_sink_write`].
55 standard_bytes_sink: Option<Vec<u8>>,
56
57 /// The slab of `BufferIters` created for this instance.
58 iters: RowIters,
59
60 /// Track time spent in module-defined spans.
61 timing_spans: TimingSpanSet,
62
63 /// The point in time the last reducer call started at.
64 reducer_start: Instant,
65
66 /// Track time spent in all wasm instance env calls (aka syscall time).
67 ///
68 /// Each function, like `insert`, will add the `Duration` spent in it
69 /// to this tracker.
70 call_times: CallTimes,
71
72 /// The last, including current, reducer to be executed by this environment.
73 reducer_name: String,
74 /// A pool of unused allocated chunks that can be reused.
75 // TODO(Centril): consider using this pool for `console_timer_start` and `bytes_sink_write`.
76 chunk_pool: ChunkPool,
77}
78
79const CALL_REDUCER_ARGS_SOURCE: u32 = 1;
80const STANDARD_BYTES_SINK: u32 = 1;
81
82type WasmResult<T> = Result<T, WasmError>;
83type RtResult<T> = anyhow::Result<T>;
84
85/// Wraps an `InstanceEnv` with the magic necessary to push
86/// and pull bytes from webassembly memory.
87impl WasmInstanceEnv {
88 /// Create a new `WasmEnstanceEnv` from the given `InstanceEnv`.
89 pub fn new(instance_env: InstanceEnv) -> Self {
90 let reducer_start = Instant::now();
91 Self {
92 instance_env,
93 mem: None,
94 call_reducer_args: None,
95 standard_bytes_sink: None,
96 iters: Default::default(),
97 timing_spans: Default::default(),
98 reducer_start,
99 call_times: CallTimes::new(),
100 reducer_name: String::from("<initializing>"),
101 chunk_pool: <_>::default(),
102 }
103 }
104
105 /// Finish the instantiation of this instance with the provided `Mem`.
106 pub fn instantiate(&mut self, mem: Mem) {
107 assert!(self.mem.is_none());
108 self.mem = Some(mem);
109 }
110
111 /// Returns a reference to the memory, assumed to be initialized.
112 pub fn get_mem(&self) -> Mem {
113 self.mem.expect("Initialized memory")
114 }
115 fn mem_env<'a>(ctx: impl Into<StoreContextMut<'a, Self>>) -> (&'a mut MemView, &'a mut Self) {
116 let ctx = ctx.into();
117 let mem = ctx.data().get_mem();
118 mem.view_and_store_mut(ctx)
119 }
120
121 /// Return a reference to the `InstanceEnv`,
122 /// which is responsible for DB instance and associated state.
123 pub fn instance_env(&self) -> &InstanceEnv {
124 &self.instance_env
125 }
126
127 /// Setup the standard bytes sink and return a handle to it for writing.
128 pub fn setup_standard_bytes_sink(&mut self) -> u32 {
129 self.standard_bytes_sink = Some(Vec::new());
130 STANDARD_BYTES_SINK
131 }
132
133 /// Extract all the bytes written to the standard bytes sink
134 /// and prevent further writes to it.
135 pub fn take_standard_bytes_sink(&mut self) -> Vec<u8> {
136 self.standard_bytes_sink.take().unwrap_or_default()
137 }
138
139 /// Signal to this `WasmInstanceEnv` that a reducer call is beginning.
140 ///
141 /// Returns the handle used by reducers to read from `args`
142 /// as well as the handle used to write the error message, if any.
143 pub fn start_reducer(&mut self, name: &str, args: bytes::Bytes, ts: Timestamp) -> (u32, u32) {
144 let errors = self.setup_standard_bytes_sink();
145
146 // Pass an invalid source when the reducer args were empty.
147 // This allows the module to avoid allocating and make a system call in those cases.
148 self.call_reducer_args = (!args.is_empty()).then_some((args, 0));
149 let args = if self.call_reducer_args.is_some() {
150 CALL_REDUCER_ARGS_SOURCE
151 } else {
152 0
153 };
154
155 self.reducer_start = Instant::now();
156 name.clone_into(&mut self.reducer_name);
157 self.instance_env.start_reducer(ts);
158
159 (args, errors)
160 }
161
162 /// Returns the name of the most recent reducer to be run in this environment.
163 pub fn reducer_name(&self) -> &str {
164 &self.reducer_name
165 }
166
167 /// Returns the name of the most recent reducer to be run in this environment.
168 pub fn reducer_start(&self) -> Instant {
169 self.reducer_start
170 }
171
172 /// Signal to this `WasmInstanceEnv` that a reducer call is over.
173 /// This resets all of the state associated to a single reducer call,
174 /// and returns instrumentation records.
175 pub fn finish_reducer(&mut self) -> (ExecutionTimings, Vec<u8>) {
176 // For the moment,
177 // we only explicitly clear the source/sink buffers and the "syscall" times.
178 // TODO: should we be clearing `iters` and/or `timing_spans`?
179
180 let total_duration = self.reducer_start.elapsed();
181
182 // Taking the call times record also resets timings to 0s for the next call.
183 let wasm_instance_env_call_times = self.call_times.take();
184
185 let timings = ExecutionTimings {
186 total_duration,
187 wasm_instance_env_call_times,
188 };
189
190 self.call_reducer_args = None;
191 (timings, self.take_standard_bytes_sink())
192 }
193
194 fn with_span<R>(mut caller: Caller<'_, Self>, func: AbiCall, run: impl FnOnce(&mut Caller<'_, Self>) -> R) -> R {
195 let span_start = span::CallSpanStart::new(func);
196
197 // Call `run` with the caller and a handle to the memory.
198 let result = run(&mut caller);
199
200 // Track the span of this call.
201 let span = span_start.end();
202 span::record_span(&mut caller.data_mut().call_times, span);
203
204 result
205 }
206
207 fn convert_wasm_result<T: From<u16>>(func: AbiCall, err: WasmError) -> RtResult<T> {
208 Err(match err {
209 WasmError::Db(err) => match err_to_errno(&err) {
210 Some(errno) => {
211 log::debug!(
212 "abi call to {func} returned an errno: {errno} ({})",
213 errno::strerror(errno).unwrap_or("<unknown>")
214 );
215 return Ok(errno.get().into());
216 }
217 None => anyhow::Error::from(AbiRuntimeError { func, err }),
218 },
219 WasmError::BufferTooSmall => return Ok(errno::BUFFER_TOO_SMALL.get().into()),
220 WasmError::Wasm(err) => err,
221 })
222 }
223
224 /// Call the function `run` with the name `func`.
225 /// The function `run` is provided with the callers environment and the host's memory.
226 ///
227 /// One of `cvt_custom`, `cvt`, `cvt_ret`, or `cvt_noret` should be used in the implementation of any
228 /// host call, to provide consistent error handling and instrumentation.
229 ///
230 /// Some database errors are logged but are otherwise regarded as `Ok(_)`.
231 /// See `err_to_errno` for a list.
232 ///
233 /// This variant should be used when more control is needed over the success value.
234 fn cvt_custom<T: From<u16>>(
235 caller: Caller<'_, Self>,
236 func: AbiCall,
237 run: impl FnOnce(&mut Caller<'_, Self>) -> WasmResult<T>,
238 ) -> RtResult<T> {
239 Self::with_span(caller, func, run).or_else(|err| Self::convert_wasm_result(func, err))
240 }
241
242 /// Call the function `run` with the name `func`.
243 /// The function `run` is provided with the callers environment and the host's memory.
244 ///
245 /// One of `cvt`, `cvt_ret`, or `cvt_noret` should be used in the implementation of any
246 /// host call, to provide consistent error handling and instrumentation.
247 ///
248 /// Some database errors are logged but are otherwise regarded as `Ok(_)`.
249 /// See `err_to_errno` for a list.
250 fn cvt<T: From<u16>>(
251 caller: Caller<'_, Self>,
252 func: AbiCall,
253 run: impl FnOnce(&mut Caller<'_, Self>) -> WasmResult<()>,
254 ) -> RtResult<T> {
255 Self::cvt_custom(caller, func, |c| run(c).map(|()| 0u16.into()))
256 }
257
258 /// Call the function `f` with any return value being written to the pointer `out`.
259 ///
260 /// Otherwise, `cvt_ret` (this function) behaves as `cvt`.
261 ///
262 /// One of `cvt`, `cvt_ret`, or `cvt_noret` should be used in the implementation of any
263 /// host call, to provide consistent error handling and instrumentation.
264 ///
265 /// This method should be used as opposed to a manual implementation,
266 /// as it helps with upholding the safety invariants of [`bindings_sys::call`].
267 ///
268 /// Returns an error if writing `T` to `out` errors.
269 fn cvt_ret<O: WasmPointee>(
270 caller: Caller<'_, Self>,
271 call: AbiCall,
272 out: WasmPtr<O>,
273 f: impl FnOnce(&mut Caller<'_, Self>) -> WasmResult<O>,
274 ) -> RtResult<u32> {
275 Self::cvt(caller, call, |caller| {
276 f(caller).and_then(|ret| {
277 let (mem, _) = Self::mem_env(caller);
278 ret.write_to(mem, out).map_err(|e| e.into())
279 })
280 })
281 }
282
283 /// Call the function `f`.
284 ///
285 /// This is the version of `cvt` or `cvt_ret` for functions with no return value.
286 /// One of `cvt`, `cvt_ret`, or `cvt_noret` should be used in the implementation of any
287 /// host call, to provide consistent error handling and instrumentation.
288 fn cvt_noret(caller: Caller<'_, Self>, call: AbiCall, f: impl FnOnce(&mut Caller<'_, Self>)) {
289 Self::with_span(caller, call, f)
290 }
291
292 fn convert_u32_to_col_id(col_id: u32) -> WasmResult<ColId> {
293 let col_id: u16 = col_id
294 .try_into()
295 .context("ABI violation, a `ColId` must be a `u16`")
296 .map_err(WasmError::Wasm)?;
297 Ok(col_id.into())
298 }
299
300 /// Queries the `table_id` associated with the given (table) `name`
301 /// where `name` is the UTF-8 slice in WASM memory at `name_ptr[..name_len]`.
302 ///
303 /// The table id is written into the `out` pointer.
304 ///
305 /// # Traps
306 ///
307 /// Traps if:
308 /// - `name_ptr` is NULL or `name` is not in bounds of WASM memory.
309 /// - `name` is not valid UTF-8.
310 /// - `out` is NULL or `out[..size_of::<TableId>()]` is not in bounds of WASM memory.
311 ///
312 /// # Errors
313 ///
314 /// Returns an error:
315 ///
316 /// - `NOT_IN_TRANSACTION`, when called outside of a transaction.
317 /// - `NO_SUCH_TABLE`, when `name` is not the name of a table.
318 #[tracing::instrument(level = "trace", skip_all)]
319 pub fn table_id_from_name(
320 caller: Caller<'_, Self>,
321 name: WasmPtr<u8>,
322 name_len: u32,
323 out: WasmPtr<u32>,
324 ) -> RtResult<u32> {
325 Self::cvt_ret::<u32>(caller, AbiCall::TableIdFromName, out, |caller| {
326 let (mem, env) = Self::mem_env(caller);
327 // Read the table name from WASM memory.
328 let name = mem.deref_str(name, name_len)?;
329
330 // Query the table id.
331 Ok(env.instance_env.table_id_from_name(name)?.into())
332 })
333 }
334
335 /// Queries the `index_id` associated with the given (index) `name`
336 /// where `name` is the UTF-8 slice in WASM memory at `name_ptr[..name_len]`.
337 ///
338 /// The index id is written into the `out` pointer.
339 ///
340 /// # Traps
341 ///
342 /// Traps if:
343 /// - `name_ptr` is NULL or `name` is not in bounds of WASM memory.
344 /// - `name` is not valid UTF-8.
345 /// - `out` is NULL or `out[..size_of::<IndexId>()]` is not in bounds of WASM memory.
346 ///
347 /// # Errors
348 ///
349 /// Returns an error:
350 ///
351 /// - `NOT_IN_TRANSACTION`, when called outside of a transaction.
352 /// - `NO_SUCH_INDEX`, when `name` is not the name of an index.
353 #[tracing::instrument(level = "trace", skip_all)]
354 pub fn index_id_from_name(
355 caller: Caller<'_, Self>,
356 name: WasmPtr<u8>,
357 name_len: u32,
358 out: WasmPtr<u32>,
359 ) -> RtResult<u32> {
360 Self::cvt_ret::<u32>(caller, AbiCall::IndexIdFromName, out, |caller| {
361 let (mem, env) = Self::mem_env(caller);
362 // Read the index name from WASM memory.
363 let name = mem.deref_str(name, name_len)?;
364
365 // Query the index id.
366 Ok(env.instance_env.index_id_from_name(name)?.into())
367 })
368 }
369
370 /// Writes the number of rows currently in table identified by `table_id` to `out`.
371 ///
372 /// # Traps
373 ///
374 /// Traps if:
375 /// - `out` is NULL or `out[..size_of::<u64>()]` is not in bounds of WASM memory.
376 ///
377 /// # Errors
378 ///
379 /// Returns an error:
380 ///
381 /// - `NOT_IN_TRANSACTION`, when called outside of a transaction.
382 /// - `NO_SUCH_TABLE`, when `table_id` is not a known ID of a table.
383 #[tracing::instrument(level = "trace", skip_all)]
384 pub fn datastore_table_row_count(caller: Caller<'_, Self>, table_id: u32, out: WasmPtr<u64>) -> RtResult<u32> {
385 Self::cvt_ret::<u64>(caller, AbiCall::DatastoreTableRowCount, out, |caller| {
386 let (_, env) = Self::mem_env(caller);
387 Ok(env.instance_env.datastore_table_row_count(table_id.into())?)
388 })
389 }
390
391 /// Starts iteration on each row, as BSATN-encoded, of a table identified by `table_id`.
392 ///
393 /// On success, the iterator handle is written to the `out` pointer.
394 /// This handle can be advanced by [`row_iter_bsatn_advance`].
395 ///
396 /// # Traps
397 ///
398 /// This function does not trap.
399 ///
400 /// # Errors
401 ///
402 /// Returns an error:
403 ///
404 /// - `NOT_IN_TRANSACTION`, when called outside of a transaction.
405 /// - `NO_SUCH_TABLE`, when `table_id` is not a known ID of a table.
406 // #[tracing::instrument(level = "trace", skip_all)]
407 pub fn datastore_table_scan_bsatn(
408 caller: Caller<'_, Self>,
409 table_id: u32,
410 out: WasmPtr<RowIterIdx>,
411 ) -> RtResult<u32> {
412 Self::cvt_ret(caller, AbiCall::DatastoreTableScanBsatn, out, |caller| {
413 let env = caller.data_mut();
414 // Collect the iterator chunks.
415 let chunks = env
416 .instance_env
417 .datastore_table_scan_bsatn_chunks(&mut env.chunk_pool, table_id.into())?;
418 // Register the iterator and get back the index to write to `out`.
419 // Calls to the iterator are done through dynamic dispatch.
420 Ok(env.iters.insert(chunks.into_iter()))
421 })
422 }
423
424 /// Finds all rows in the index identified by `index_id`,
425 /// according to the:
426 /// - `prefix = prefix_ptr[..prefix_len]`,
427 /// - `rstart = rstart_ptr[..rstart_len]`,
428 /// - `rend = rend_ptr[..rend_len]`,
429 ///
430 /// in WASM memory.
431 ///
432 /// The index itself has a schema/type.
433 /// The `prefix` is decoded to the initial `prefix_elems` `AlgebraicType`s
434 /// whereas `rstart` and `rend` are decoded to the `prefix_elems + 1` `AlgebraicType`
435 /// where the `AlgebraicValue`s are wrapped in `Bound`.
436 /// That is, `rstart, rend` are BSATN-encoded `Bound<AlgebraicValue>`s.
437 ///
438 /// Matching is then defined by equating `prefix`
439 /// to the initial `prefix_elems` columns of the index
440 /// and then imposing `rstart` as the starting bound
441 /// and `rend` as the ending bound on the `prefix_elems + 1` column of the index.
442 /// Remaining columns of the index are then unbounded.
443 /// Note that the `prefix` in this case can be empty (`prefix_elems = 0`),
444 /// in which case this becomes a ranged index scan on a single-col index
445 /// or even a full table scan if `rstart` and `rend` are both unbounded.
446 ///
447 /// The relevant table for the index is found implicitly via the `index_id`,
448 /// which is unique for the module.
449 ///
450 /// On success, the iterator handle is written to the `out` pointer.
451 /// This handle can be advanced by [`row_iter_bsatn_advance`].
452 ///
453 /// # Non-obvious queries
454 ///
455 /// For an index on columns `[a, b, c]`:
456 ///
457 /// - `a = x, b = y` is encoded as a prefix `[x, y]`
458 /// and a range `Range::Unbounded`,
459 /// or as a prefix `[x]` and a range `rstart = rend = Range::Inclusive(y)`.
460 /// - `a = x, b = y, c = z` is encoded as a prefix `[x, y]`
461 /// and a range `rstart = rend = Range::Inclusive(z)`.
462 /// - A sorted full scan is encoded as an empty prefix
463 /// and a range `Range::Unbounded`.
464 ///
465 /// # Traps
466 ///
467 /// Traps if:
468 /// - `prefix_elems > 0`
469 /// and (`prefix_ptr` is NULL or `prefix` is not in bounds of WASM memory).
470 /// - `rstart` is NULL or `rstart` is not in bounds of WASM memory.
471 /// - `rend` is NULL or `rend` is not in bounds of WASM memory.
472 /// - `out` is NULL or `out[..size_of::<RowIter>()]` is not in bounds of WASM memory.
473 ///
474 /// # Errors
475 ///
476 /// Returns an error:
477 ///
478 /// - `NOT_IN_TRANSACTION`, when called outside of a transaction.
479 /// - `NO_SUCH_INDEX`, when `index_id` is not a known ID of an index.
480 /// - `WRONG_INDEX_ALGO` if the index is not a range-scan compatible index.
481 /// - `BSATN_DECODE_ERROR`, when `prefix` cannot be decoded to
482 /// a `prefix_elems` number of `AlgebraicValue`
483 /// typed at the initial `prefix_elems` `AlgebraicType`s of the index's key type.
484 /// Or when `rstart` or `rend` cannot be decoded to an `Bound<AlgebraicValue>`
485 /// where the inner `AlgebraicValue`s are
486 /// typed at the `prefix_elems + 1` `AlgebraicType` of the index's key type.
487 pub fn datastore_index_scan_range_bsatn(
488 caller: Caller<'_, Self>,
489 index_id: u32,
490 prefix_ptr: WasmPtr<u8>,
491 prefix_len: u32,
492 prefix_elems: u32,
493 rstart_ptr: WasmPtr<u8>, // Bound<AlgebraicValue>
494 rstart_len: u32,
495 rend_ptr: WasmPtr<u8>, // Bound<AlgebraicValue>
496 rend_len: u32,
497 out: WasmPtr<RowIterIdx>,
498 ) -> RtResult<u32> {
499 Self::cvt_ret(caller, AbiCall::DatastoreIndexScanRangeBsatn, out, |caller| {
500 let prefix_elems = Self::convert_u32_to_col_id(prefix_elems)?;
501
502 let (mem, env) = Self::mem_env(caller);
503 // Read the prefix and range start & end from WASM memory.
504 let prefix = if prefix_elems.idx() == 0 {
505 &[]
506 } else {
507 mem.deref_slice(prefix_ptr, prefix_len)?
508 };
509 let rstart = mem.deref_slice(rstart_ptr, rstart_len)?;
510 let rend = mem.deref_slice(rend_ptr, rend_len)?;
511
512 // Find the relevant rows.
513 let chunks = env.instance_env.datastore_index_scan_range_bsatn_chunks(
514 &mut env.chunk_pool,
515 index_id.into(),
516 prefix,
517 prefix_elems,
518 rstart,
519 rend,
520 )?;
521
522 // Insert the encoded + concatenated rows into a new buffer and return its id.
523 Ok(env.iters.insert(chunks.into_iter()))
524 })
525 }
526
527 /// Deprecated name for [`Self::datastore_index_scan_range_bsatn`].
528 #[deprecated = "use `datastore_index_scan_range_bsatn` instead"]
529 pub fn datastore_btree_scan_bsatn(
530 caller: Caller<'_, Self>,
531 index_id: u32,
532 prefix_ptr: WasmPtr<u8>,
533 prefix_len: u32,
534 prefix_elems: u32,
535 rstart_ptr: WasmPtr<u8>, // Bound<AlgebraicValue>
536 rstart_len: u32,
537 rend_ptr: WasmPtr<u8>, // Bound<AlgebraicValue>
538 rend_len: u32,
539 out: WasmPtr<RowIterIdx>,
540 ) -> RtResult<u32> {
541 Self::datastore_index_scan_range_bsatn(
542 caller,
543 index_id,
544 prefix_ptr,
545 prefix_len,
546 prefix_elems,
547 rstart_ptr,
548 rstart_len,
549 rend_ptr,
550 rend_len,
551 out,
552 )
553 }
554
555 /// Reads rows from the given iterator registered under `iter`.
556 ///
557 /// Takes rows from the iterator
558 /// and stores them in the memory pointed to by `buffer = buffer_ptr[..buffer_len]`,
559 /// encoded in BSATN format.
560 ///
561 /// The `buffer_len = buffer_len_ptr[..size_of::<usize>()]` stores the capacity of `buffer`.
562 /// On success (`0` or `-1` is returned),
563 /// `buffer_len` is set to the combined length of the encoded rows.
564 /// When `-1` is returned, the iterator has been exhausted
565 /// and there are no more rows to read,
566 /// leading to the iterator being immediately destroyed.
567 /// Note that the host is free to reuse allocations in a pool,
568 /// destroying the handle logically does not entail that memory is necessarily reclaimed.
569 ///
570 /// # Traps
571 ///
572 /// Traps if:
573 ///
574 /// - `buffer_len_ptr` is NULL or `buffer_len` is not in bounds of WASM memory.
575 /// - `buffer_ptr` is NULL or `buffer` is not in bounds of WASM memory.
576 ///
577 /// # Errors
578 ///
579 /// Returns an error:
580 ///
581 /// - `NO_SUCH_ITER`, when `iter` is not a valid iterator.
582 /// - `BUFFER_TOO_SMALL`, when there are rows left but they cannot fit in `buffer`.
583 /// When this occurs, `buffer_len` is set to the size of the next item in the iterator.
584 /// To make progress, the caller should reallocate the buffer to at least that size and try again.
585 // #[tracing::instrument(level = "trace", skip_all)]
586 pub fn row_iter_bsatn_advance(
587 caller: Caller<'_, Self>,
588 iter: u32,
589 buffer_ptr: WasmPtr<u8>,
590 buffer_len_ptr: WasmPtr<u32>,
591 ) -> RtResult<i32> {
592 let row_iter_idx = RowIterIdx(iter);
593 Self::cvt_custom(caller, AbiCall::RowIterBsatnAdvance, |caller| {
594 let (mem, env) = Self::mem_env(caller);
595
596 // Retrieve the iterator by `row_iter_idx`, or error.
597 let Some(iter) = env.iters.get_mut(row_iter_idx) else {
598 return Ok(errno::NO_SUCH_ITER.get().into());
599 };
600
601 // Read `buffer_len`, i.e., the capacity of `buffer` pointed to by `buffer_ptr`.
602 let buffer_len = u32::read_from(mem, buffer_len_ptr)?;
603 let write_buffer_len = |mem, len| u32::try_from(len).unwrap().write_to(mem, buffer_len_ptr);
604 // Get a mutable view to the `buffer`.
605 let mut buffer = mem.deref_slice_mut(buffer_ptr, buffer_len)?;
606
607 let mut written = 0;
608 // Fill the buffer as much as possible.
609 while let Some(chunk) = iter.as_slice().first() {
610 let Some((buf_chunk, rest)) = buffer.split_at_mut_checked(chunk.len()) else {
611 // Cannot fit chunk into the buffer,
612 // either because we already filled it too much,
613 // or because it is too small.
614 break;
615 };
616 buf_chunk.copy_from_slice(chunk);
617 written += chunk.len();
618 buffer = rest;
619
620 // Advance the iterator, as we used a chunk.
621 // SAFETY: We peeked one `chunk`, so there must be one at least.
622 let chunk = unsafe { iter.next().unwrap_unchecked() };
623 env.chunk_pool.put(chunk);
624 }
625
626 let ret = match (written, iter.as_slice().first()) {
627 // Nothing was written and the iterator is not exhausted.
628 (0, Some(chunk)) => {
629 write_buffer_len(mem, chunk.len())?;
630 return Ok(errno::BUFFER_TOO_SMALL.get().into());
631 }
632 // The iterator is exhausted, destroy it, and tell the caller.
633 (_, None) => {
634 env.iters.take(row_iter_idx);
635 -1
636 }
637 // Something was written, but the iterator is not exhausted.
638 (_, Some(_)) => 0,
639 };
640 write_buffer_len(mem, written)?;
641 Ok(ret)
642 })
643 }
644
645 /// Destroys the iterator registered under `iter`.
646 ///
647 /// Once `row_iter_bsatn_close` is called on `iter`, the `iter` is invalid.
648 /// That is, `row_iter_bsatn_close(iter)` the second time will yield `NO_SUCH_ITER`.
649 ///
650 /// # Errors
651 ///
652 /// Returns an error:
653 ///
654 /// - `NO_SUCH_ITER`, when `iter` is not a valid iterator.
655 // #[tracing::instrument(level = "trace", skip_all)]
656 pub fn row_iter_bsatn_close(caller: Caller<'_, Self>, iter: u32) -> RtResult<u32> {
657 let row_iter_idx = RowIterIdx(iter);
658 Self::cvt_custom(caller, AbiCall::RowIterBsatnClose, |caller| {
659 let (_, env) = Self::mem_env(caller);
660
661 // Retrieve the iterator by `row_iter_idx`, or error.
662 Ok(match env.iters.take(row_iter_idx) {
663 None => errno::NO_SUCH_ITER.get().into(),
664 // TODO(Centril): consider putting these into a pool for reuse.
665 Some(_) => 0,
666 })
667 })
668 }
669
670 /// Inserts a row into the table identified by `table_id`,
671 /// where the row is read from the byte string `row = row_ptr[..row_len]` in WASM memory
672 /// where `row_len = row_len_ptr[..size_of::<usize>()]` stores the capacity of `row`.
673 ///
674 /// The byte string `row` must be a BSATN-encoded `ProductValue`
675 /// typed at the table's `ProductType` row-schema.
676 ///
677 /// To handle auto-incrementing columns,
678 /// when the call is successful,
679 /// the `row` is written back to with the generated sequence values.
680 /// These values are written as a BSATN-encoded `pv: ProductValue`.
681 /// Each `v: AlgebraicValue` in `pv` is typed at the sequence's column type.
682 /// The `v`s in `pv` are ordered by the order of the columns, in the schema of the table.
683 /// When the table has no sequences,
684 /// this implies that the `pv`, and thus `row`, will be empty.
685 /// The `row_len` is set to the length of `bsatn(pv)`.
686 ///
687 /// # Traps
688 ///
689 /// Traps if:
690 /// - `row_len_ptr` is NULL or `row_len` is not in bounds of WASM memory.
691 /// - `row_ptr` is NULL or `row` is not in bounds of WASM memory.
692 ///
693 /// # Errors
694 ///
695 /// Returns an error:
696 ///
697 /// - `NOT_IN_TRANSACTION`, when called outside of a transaction.
698 /// - `NO_SUCH_TABLE`, when `table_id` is not a known ID of a table.
699 /// - `BSATN_DECODE_ERROR`, when `row` cannot be decoded to a `ProductValue`.
700 /// typed at the `ProductType` the table's schema specifies.
701 /// - `UNIQUE_ALREADY_EXISTS`, when inserting `row` would violate a unique constraint.
702 /// - `SCHEDULE_AT_DELAY_TOO_LONG`, when the delay specified in the row was too long.
703 #[tracing::instrument(level = "trace", skip_all)]
704 pub fn datastore_insert_bsatn(
705 caller: Caller<'_, Self>,
706 table_id: u32,
707 row_ptr: WasmPtr<u8>,
708 row_len_ptr: WasmPtr<u32>,
709 ) -> RtResult<u32> {
710 Self::cvt(caller, AbiCall::DatastoreInsertBsatn, |caller| {
711 let (mem, env) = Self::mem_env(caller);
712
713 // Read `row-len`, i.e., the capacity of `row` pointed to by `row_ptr`.
714 let row_len = u32::read_from(mem, row_len_ptr)?;
715 // Get a mutable view to the `row`.
716 let row = mem.deref_slice_mut(row_ptr, row_len)?;
717
718 // Insert the row into the DB and write back the generated column values.
719 let row_len = env.instance_env.insert(table_id.into(), row)?;
720 u32::try_from(row_len).unwrap().write_to(mem, row_len_ptr)?;
721 Ok(())
722 })
723 }
724
725 /// Updates a row in the table identified by `table_id` to `row`
726 /// where the row is read from the byte string `row = row_ptr[..row_len]` in WASM memory
727 /// where `row_len = row_len_ptr[..size_of::<usize>()]` stores the capacity of `row`.
728 ///
729 /// The byte string `row` must be a BSATN-encoded `ProductValue`
730 /// typed at the table's `ProductType` row-schema.
731 ///
732 /// The row to update is found by projecting `row`
733 /// to the type of the *unique* index identified by `index_id`.
734 /// If no row is found, the error `NO_SUCH_ROW` is returned.
735 ///
736 /// To handle auto-incrementing columns,
737 /// when the call is successful,
738 /// the `row` is written back to with the generated sequence values.
739 /// These values are written as a BSATN-encoded `pv: ProductValue`.
740 /// Each `v: AlgebraicValue` in `pv` is typed at the sequence's column type.
741 /// The `v`s in `pv` are ordered by the order of the columns, in the schema of the table.
742 /// When the table has no sequences,
743 /// this implies that the `pv`, and thus `row`, will be empty.
744 /// The `row_len` is set to the length of `bsatn(pv)`.
745 ///
746 /// # Traps
747 ///
748 /// Traps if:
749 /// - `row_len_ptr` is NULL or `row_len` is not in bounds of WASM memory.
750 /// - `row_ptr` is NULL or `row` is not in bounds of WASM memory.
751 ///
752 /// # Errors
753 ///
754 /// Returns an error:
755 ///
756 /// - `NOT_IN_TRANSACTION`, when called outside of a transaction.
757 /// - `NO_SUCH_TABLE`, when `table_id` is not a known ID of a table.
758 /// - `NO_SUCH_INDEX`, when `index_id` is not a known ID of an index.
759 /// - `INDEX_NOT_UNIQUE`, when the index was not unique.
760 /// - `BSATN_DECODE_ERROR`, when `row` cannot be decoded to a `ProductValue`
761 /// typed at the `ProductType` the table's schema specifies
762 /// or when it cannot be projected to the index identified by `index_id`.
763 /// - `NO_SUCH_ROW`, when the row was not found in the unique index.
764 /// - `UNIQUE_ALREADY_EXISTS`, when inserting `row` would violate a unique constraint.
765 /// - `SCHEDULE_AT_DELAY_TOO_LONG`, when the delay specified in the row was too long.
766 #[tracing::instrument(level = "trace", skip_all)]
767 pub fn datastore_update_bsatn(
768 caller: Caller<'_, Self>,
769 table_id: u32,
770 index_id: u32,
771 row_ptr: WasmPtr<u8>,
772 row_len_ptr: WasmPtr<u32>,
773 ) -> RtResult<u32> {
774 Self::cvt(caller, AbiCall::DatastoreUpdateBsatn, |caller| {
775 let (mem, env) = Self::mem_env(caller);
776
777 // Read `row-len`, i.e., the capacity of `row` pointed to by `row_ptr`.
778 let row_len = u32::read_from(mem, row_len_ptr)?;
779 // Get a mutable view to the `row`.
780 let row = mem.deref_slice_mut(row_ptr, row_len)?;
781
782 // Update the row in the DB and write back the generated column values.
783 let row_len = env.instance_env.update(table_id.into(), index_id.into(), row)?;
784 u32::try_from(row_len).unwrap().write_to(mem, row_len_ptr)?;
785 Ok(())
786 })
787 }
788
789 /// Deletes all rows found in the index identified by `index_id`,
790 /// according to the:
791 /// - `prefix = prefix_ptr[..prefix_len]`,
792 /// - `rstart = rstart_ptr[..rstart_len]`,
793 /// - `rend = rend_ptr[..rend_len]`,
794 ///
795 /// in WASM memory.
796 ///
797 /// This syscall will delete all the rows found by
798 /// [`datastore_index_scan_range_bsatn`] with the same arguments passed,
799 /// including `prefix_elems`.
800 /// See `datastore_index_scan_range_bsatn` for details.
801 ///
802 /// The number of rows deleted is written to the WASM pointer `out`.
803 ///
804 /// # Traps
805 ///
806 /// Traps if:
807 /// - `prefix_elems > 0`
808 /// and (`prefix_ptr` is NULL or `prefix` is not in bounds of WASM memory).
809 /// - `rstart` is NULL or `rstart` is not in bounds of WASM memory.
810 /// - `rend` is NULL or `rend` is not in bounds of WASM memory.
811 /// - `out` is NULL or `out[..size_of::<u32>()]` is not in bounds of WASM memory.
812 ///
813 /// # Errors
814 ///
815 /// Returns an error:
816 ///
817 /// - `NOT_IN_TRANSACTION`, when called outside of a transaction.
818 /// - `NO_SUCH_INDEX`, when `index_id` is not a known ID of an index.
819 /// - `WRONG_INDEX_ALGO` if the index is not a range-compatible index.
820 /// - `BSATN_DECODE_ERROR`, when `prefix` cannot be decoded to
821 /// a `prefix_elems` number of `AlgebraicValue`
822 /// typed at the initial `prefix_elems` `AlgebraicType`s of the index's key type.
823 /// Or when `rstart` or `rend` cannot be decoded to an `Bound<AlgebraicValue>`
824 /// where the inner `AlgebraicValue`s are
825 /// typed at the `prefix_elems + 1` `AlgebraicType` of the index's key type.
826 pub fn datastore_delete_by_index_scan_range_bsatn(
827 caller: Caller<'_, Self>,
828 index_id: u32,
829 prefix_ptr: WasmPtr<u8>,
830 prefix_len: u32,
831 prefix_elems: u32,
832 rstart_ptr: WasmPtr<u8>, // Bound<AlgebraicValue>
833 rstart_len: u32,
834 rend_ptr: WasmPtr<u8>, // Bound<AlgebraicValue>
835 rend_len: u32,
836 out: WasmPtr<u32>,
837 ) -> RtResult<u32> {
838 Self::cvt_ret(caller, AbiCall::DatastoreDeleteByIndexScanRangeBsatn, out, |caller| {
839 let prefix_elems = Self::convert_u32_to_col_id(prefix_elems)?;
840
841 let (mem, env) = Self::mem_env(caller);
842 // Read the prefix and range start & end from WASM memory.
843 let prefix = if prefix_elems.idx() == 0 {
844 &[]
845 } else {
846 mem.deref_slice(prefix_ptr, prefix_len)?
847 };
848 let rstart = mem.deref_slice(rstart_ptr, rstart_len)?;
849 let rend = mem.deref_slice(rend_ptr, rend_len)?;
850
851 // Delete the relevant rows.
852 Ok(env.instance_env.datastore_delete_by_index_scan_range_bsatn(
853 index_id.into(),
854 prefix,
855 prefix_elems,
856 rstart,
857 rend,
858 )?)
859 })
860 }
861
862 /// Deprecated name for [`Self::datastore_delete_by_index_scan_range_bsatn`].
863 #[deprecated = "use `datastore_delete_by_index_scan_range_bsatn` instead"]
864 pub fn datastore_delete_by_btree_scan_bsatn(
865 caller: Caller<'_, Self>,
866 index_id: u32,
867 prefix_ptr: WasmPtr<u8>,
868 prefix_len: u32,
869 prefix_elems: u32,
870 rstart_ptr: WasmPtr<u8>, // Bound<AlgebraicValue>
871 rstart_len: u32,
872 rend_ptr: WasmPtr<u8>, // Bound<AlgebraicValue>
873 rend_len: u32,
874 out: WasmPtr<u32>,
875 ) -> RtResult<u32> {
876 Self::datastore_delete_by_index_scan_range_bsatn(
877 caller,
878 index_id,
879 prefix_ptr,
880 prefix_len,
881 prefix_elems,
882 rstart_ptr,
883 rstart_len,
884 rend_ptr,
885 rend_len,
886 out,
887 )
888 }
889
890 /// Deletes those rows, in the table identified by `table_id`,
891 /// that match any row in the byte string `rel = rel_ptr[..rel_len]` in WASM memory.
892 ///
893 /// Matching is defined by first BSATN-decoding
894 /// the byte string pointed to at by `relation` to a `Vec<ProductValue>`
895 /// according to the row schema of the table
896 /// and then using `Ord for AlgebraicValue`.
897 /// A match happens when `Ordering::Equal` is returned from `fn cmp`.
898 /// This occurs exactly when the row's BSATN-encoding is equal to the encoding of the `ProductValue`.
899 ///
900 /// The number of rows deleted is written to the WASM pointer `out`.
901 ///
902 /// # Traps
903 ///
904 /// Traps if:
905 /// - `rel_ptr` is NULL or `rel` is not in bounds of WASM memory.
906 /// - `out` is NULL or `out[..size_of::<u32>()]` is not in bounds of WASM memory.
907 ///
908 /// # Errors
909 ///
910 /// Returns an error:
911 ///
912 /// - `NOT_IN_TRANSACTION`, when called outside of a transaction.
913 /// - `NO_SUCH_TABLE`, when `table_id` is not a known ID of a table.
914 /// - `BSATN_DECODE_ERROR`, when `rel` cannot be decoded to `Vec<ProductValue>`
915 /// where each `ProductValue` is typed at the `ProductType` the table's schema specifies.
916 #[tracing::instrument(level = "trace", skip_all)]
917 pub fn datastore_delete_all_by_eq_bsatn(
918 caller: Caller<'_, Self>,
919 table_id: u32,
920 rel_ptr: WasmPtr<u8>,
921 rel_len: u32,
922 out: WasmPtr<u32>,
923 ) -> RtResult<u32> {
924 Self::cvt_ret(caller, AbiCall::DatastoreDeleteAllByEqBsatn, out, |caller| {
925 let (mem, env) = Self::mem_env(caller);
926 let relation = mem.deref_slice(rel_ptr, rel_len)?;
927 Ok(env
928 .instance_env
929 .datastore_delete_all_by_eq_bsatn(table_id.into(), relation)?)
930 })
931 }
932
933 pub fn volatile_nonatomic_schedule_immediate(
934 caller: Caller<'_, Self>,
935 name: WasmPtr<u8>,
936 name_len: u32,
937 args: WasmPtr<u8>,
938 args_len: u32,
939 ) -> RtResult<()> {
940 Self::with_span(caller, AbiCall::VolatileNonatomicScheduleImmediate, |caller| {
941 let (mem, env) = Self::mem_env(caller);
942 let name = mem.deref_str(name, name_len)?;
943 let args = mem.deref_slice(args, args_len)?;
944 env.instance_env.scheduler.volatile_nonatomic_schedule_immediate(
945 name.to_owned(),
946 crate::host::ReducerArgs::Bsatn(args.to_vec().into()),
947 );
948
949 Ok(())
950 })
951 }
952
953 /// Reads bytes from `source`, registered in the host environment,
954 /// and stores them in the memory pointed to by `buffer = buffer_ptr[..buffer_len]`.
955 ///
956 /// The `buffer_len = buffer_len_ptr[..size_of::<usize>()]` stores the capacity of `buffer`.
957 /// On success (`0` or `-1` is returned),
958 /// `buffer_len` is set to the number of bytes written to `buffer`.
959 /// When `-1` is returned, the resource has been exhausted
960 /// and there are no more bytes to read,
961 /// leading to the resource being immediately destroyed.
962 /// Note that the host is free to reuse allocations in a pool,
963 /// destroying the handle logically does not entail that memory is necessarily reclaimed.
964 ///
965 /// # Traps
966 ///
967 /// Traps if:
968 ///
969 /// - `buffer_len_ptr` is NULL or `buffer_len` is not in bounds of WASM memory.
970 /// - `buffer_ptr` is NULL or `buffer` is not in bounds of WASM memory.
971 ///
972 /// # Errors
973 ///
974 /// Returns an error:
975 ///
976 /// - `NO_SUCH_BYTES`, when `source` is not a valid bytes source.
977 ///
978 /// # Example
979 ///
980 /// The typical use case for this ABI is in `__call_reducer__`,
981 /// to read and deserialize the `args`.
982 /// An example definition, dealing with `args` might be:
983 /// ```rust,ignore
984 /// /// #[no_mangle]
985 /// extern "C" fn __call_reducer__(..., args: BytesSource, ...) -> i16 {
986 /// // ...
987 ///
988 /// let mut buf = Vec::<u8>::with_capacity(1024);
989 /// loop {
990 /// // Write into the spare capacity of the buffer.
991 /// let buf_ptr = buf.spare_capacity_mut();
992 /// let spare_len = buf_ptr.len();
993 /// let mut buf_len = buf_ptr.len();
994 /// let buf_ptr = buf_ptr.as_mut_ptr().cast();
995 /// let ret = unsafe { bytes_source_read(args, buf_ptr, &mut buf_len) };
996 /// // SAFETY: `bytes_source_read` just appended `spare_len` bytes to `buf`.
997 /// unsafe { buf.set_len(buf.len() + spare_len) };
998 /// match ret {
999 /// // Host side source exhausted, we're done.
1000 /// -1 => break,
1001 /// // Wrote the entire spare capacity.
1002 /// // Need to reserve more space in the buffer.
1003 /// 0 if spare_len == buf_len => buf.reserve(1024),
1004 /// // Host didn't write as much as possible.
1005 /// // Try to read some more.
1006 /// // The host will likely not trigger this branch,
1007 /// // but a module should be prepared for it.
1008 /// 0 => {}
1009 /// _ => unreachable!(),
1010 /// }
1011 /// }
1012 ///
1013 /// // ...
1014 /// }
1015 /// ```
1016 pub fn bytes_source_read(
1017 caller: Caller<'_, Self>,
1018 source: u32,
1019 buffer_ptr: WasmPtr<u8>,
1020 buffer_len_ptr: WasmPtr<u32>,
1021 ) -> RtResult<i32> {
1022 Self::cvt_custom(caller, AbiCall::BytesSourceRead, |caller| {
1023 let (mem, env) = Self::mem_env(caller);
1024
1025 // Retrieve the reducer args if available and requested, or error.
1026 let Some((reducer_args, cursor)) = env
1027 .call_reducer_args
1028 .as_mut()
1029 .filter(|_| source == CALL_REDUCER_ARGS_SOURCE)
1030 else {
1031 return Ok(errno::NO_SUCH_BYTES.get().into());
1032 };
1033
1034 // Read `buffer_len`, i.e., the capacity of `buffer` pointed to by `buffer_ptr`.
1035 let buffer_len = u32::read_from(mem, buffer_len_ptr)?;
1036 // Get a mutable view to the `buffer`.
1037 let buffer = mem.deref_slice_mut(buffer_ptr, buffer_len)?;
1038 let buffer_len = buffer_len as usize;
1039
1040 // Derive the portion that we can read and what remains,
1041 // based on what is left to read and the capacity.
1042 let left_to_read = &reducer_args[*cursor..];
1043 let can_read_len = buffer_len.min(left_to_read.len());
1044 let (can_read, remainder) = left_to_read.split_at(can_read_len);
1045 // Copy to the `buffer` and write written bytes count to `buffer_len`.
1046 buffer[..can_read_len].copy_from_slice(can_read);
1047 (can_read_len as u32).write_to(mem, buffer_len_ptr)?;
1048
1049 // Destroy the source if exhausted, or advance `cursor`.
1050 if remainder.is_empty() {
1051 env.call_reducer_args = None;
1052 Ok(-1i32)
1053 } else {
1054 *cursor += can_read_len;
1055 Ok(0)
1056 }
1057 })
1058 }
1059
1060 /// Writes up to `buffer_len` bytes from `buffer = buffer_ptr[..buffer_len]`,
1061 /// to the `sink`, registered in the host environment.
1062 ///
1063 /// The `buffer_len = buffer_len_ptr[..size_of::<usize>()]` stores the capacity of `buffer`.
1064 /// On success (`0` is returned),
1065 /// `buffer_len` is set to the number of bytes written to `sink`.
1066 ///
1067 /// # Traps
1068 ///
1069 /// - `buffer_len_ptr` is NULL or `buffer_len` is not in bounds of WASM memory.
1070 /// - `buffer_ptr` is NULL or `buffer` is not in bounds of WASM memory.
1071 ///
1072 /// # Errors
1073 ///
1074 /// Returns an error:
1075 ///
1076 /// - `NO_SUCH_BYTES`, when `sink` is not a valid bytes sink.
1077 /// - `NO_SPACE`, when there is no room for more bytes in `sink`.
1078 /// (Doesn't currently happen.)
1079 pub fn bytes_sink_write(
1080 caller: Caller<'_, Self>,
1081 sink: u32,
1082 buffer_ptr: WasmPtr<u8>,
1083 buffer_len_ptr: WasmPtr<u32>,
1084 ) -> RtResult<u32> {
1085 Self::cvt_custom(caller, AbiCall::BytesSinkWrite, |caller| {
1086 let (mem, env) = Self::mem_env(caller);
1087
1088 // Retrieve the reducer args if available and requested, or error.
1089 let Some(sink) = env.standard_bytes_sink.as_mut().filter(|_| sink == STANDARD_BYTES_SINK) else {
1090 return Ok(errno::NO_SUCH_BYTES.get().into());
1091 };
1092
1093 // Read `buffer_len`, i.e., the capacity of `buffer` pointed to by `buffer_ptr`.
1094 let buffer_len = u32::read_from(mem, buffer_len_ptr)?;
1095 // Write `buffer` to `sink`.
1096 let buffer = mem.deref_slice(buffer_ptr, buffer_len)?;
1097 sink.extend(buffer);
1098
1099 Ok(0)
1100 })
1101 }
1102
1103 /// Logs at `level` a `message` message occurring in `filename:line_number`
1104 /// with [`target`](target) being the module path at the `log!` invocation site.
1105 ///
1106 /// These various pointers are interpreted lossily as UTF-8 strings with a corresponding `_len`.
1107 ///
1108 /// The `target` and `filename` pointers are ignored by passing `NULL`.
1109 /// The line number is ignored if `line_number == u32::MAX`.
1110 ///
1111 /// No message is logged if
1112 /// - `target != NULL && target + target_len > u64::MAX`
1113 /// - `filename != NULL && filename + filename_len > u64::MAX`
1114 /// - `message + message_len > u64::MAX`
1115 ///
1116 /// # Traps
1117 ///
1118 /// Traps if:
1119 /// - `target` is not NULL and `target_ptr[..target_len]` is not in bounds of WASM memory.
1120 /// - `filename` is not NULL and `filename_ptr[..filename_len]` is not in bounds of WASM memory.
1121 /// - `message` is not NULL and `message_ptr[..message_len]` is not in bounds of WASM memory.
1122 ///
1123 /// [target]: https://docs.rs/log/latest/log/struct.Record.html#method.target
1124 #[tracing::instrument(level = "trace", skip_all)]
1125 pub fn console_log(
1126 caller: Caller<'_, Self>,
1127 level: u32,
1128 target_ptr: WasmPtr<u8>,
1129 target_len: u32,
1130 filename_ptr: WasmPtr<u8>,
1131 filename_len: u32,
1132 line_number: u32,
1133 message_ptr: WasmPtr<u8>,
1134 message_len: u32,
1135 ) {
1136 let do_console_log = |caller: &mut Caller<'_, Self>| -> WasmResult<()> {
1137 let env = caller.data();
1138 let mem = env.get_mem().view(&caller);
1139
1140 // Read the `target`, `filename`, and `message` strings from WASM memory.
1141 let target = mem.deref_str_lossy(target_ptr, target_len).check_nullptr()?;
1142 let filename = mem.deref_str_lossy(filename_ptr, filename_len).check_nullptr()?;
1143 let message = mem.deref_str_lossy(message_ptr, message_len)?;
1144
1145 // The line number cannot be `u32::MAX` as this represents `Option::None`.
1146 let line_number = (line_number != u32::MAX).then_some(line_number);
1147
1148 let record = Record {
1149 // TODO: figure out whether to use walltime now or logical reducer now (env.reducer_start)
1150 ts: chrono::Utc::now(),
1151 target: target.as_deref(),
1152 filename: filename.as_deref(),
1153 line_number,
1154 message: &message,
1155 };
1156
1157 // Write the log record to the `DatabaseLogger` in the database instance context (replica_ctx).
1158 env.instance_env
1159 .console_log((level as u8).into(), &record, &caller.as_context());
1160 Ok(())
1161 };
1162 Self::cvt_noret(caller, AbiCall::ConsoleLog, |caller| {
1163 let _ = do_console_log(caller);
1164 })
1165 }
1166
1167 /// Begins a timing span with `name = name_ptr[..name_len]`.
1168 ///
1169 /// When the returned `ConsoleTimerId` is passed to [`console_timer_end`],
1170 /// the duration between the calls will be printed to the module's logs.
1171 ///
1172 /// The `name` is interpreted lossily as a UTF-8 string.
1173 ///
1174 /// # Traps
1175 ///
1176 /// Traps if:
1177 /// - `name_ptr` is NULL or `name` is not in bounds of WASM memory.
1178 pub fn console_timer_start(caller: Caller<'_, Self>, name_ptr: WasmPtr<u8>, name_len: u32) -> RtResult<u32> {
1179 Self::with_span(caller, AbiCall::ConsoleTimerStart, |caller| {
1180 let (mem, env) = Self::mem_env(caller);
1181 let name = mem.deref_str_lossy(name_ptr, name_len)?.into_owned();
1182 Ok(env.timing_spans.insert(TimingSpan::new(name)).0)
1183 })
1184 }
1185
1186 pub fn console_timer_end(caller: Caller<'_, Self>, span_id: u32) -> RtResult<u32> {
1187 Self::cvt_custom(caller, AbiCall::ConsoleTimerEnd, |caller| {
1188 let Some(span) = caller.data_mut().timing_spans.take(TimingSpanIdx(span_id)) else {
1189 return Ok(errno::NO_SUCH_CONSOLE_TIMER.get().into());
1190 };
1191
1192 let elapsed = span.start.elapsed();
1193 let message = format!("Timing span {:?}: {:?}", &span.name, elapsed);
1194
1195 let record = Record {
1196 ts: chrono::Utc::now(),
1197 target: None,
1198 filename: None,
1199 line_number: None,
1200 message: &message,
1201 };
1202 caller.data().instance_env.console_log(
1203 crate::database_logger::LogLevel::Info,
1204 &record,
1205 &caller.as_context(),
1206 );
1207 Ok(0)
1208 })
1209 }
1210
1211 /// Writes the identity of the module into `out = out_ptr[..32]`.
1212 ///
1213 /// # Traps
1214 ///
1215 /// Traps if:
1216 ///
1217 /// - `out_ptr` is NULL or `out` is not in bounds of WASM memory.
1218 pub fn identity(caller: Caller<'_, Self>, out_ptr: WasmPtr<u8>) -> RtResult<()> {
1219 // Use `with_span` rather than one of the `cvt_*` functions,
1220 // as we want to possibly trap, but not to return an error code.
1221 Self::with_span(caller, AbiCall::Identity, |caller| {
1222 let (mem, env) = Self::mem_env(caller);
1223 let identity = env.instance_env.replica_ctx.database.database_identity;
1224 // We're implicitly casting `out_ptr` to `WasmPtr<Identity>` here.
1225 // (Both types are actually `u32`.)
1226 // This works because `Identity::write_to` does not require an aligned pointer,
1227 // as it gets a `&mut [u8]` from WASM memory and does `copy_from_slice` with it.
1228 identity.write_to(mem, out_ptr)?;
1229 Ok(())
1230 })
1231 }
1232}
1233
1234impl<T> BacktraceProvider for wasmtime::StoreContext<'_, T> {
1235 fn capture(&self) -> Box<dyn ModuleBacktrace> {
1236 Box::new(wasmtime::WasmBacktrace::capture(self))
1237 }
1238}
1239
1240impl ModuleBacktrace for wasmtime::WasmBacktrace {
1241 fn frames(&self) -> Vec<BacktraceFrame<'_>> {
1242 self.frames()
1243 .iter()
1244 .map(|f| BacktraceFrame {
1245 module_name: None,
1246 func_name: f.func_name(),
1247 })
1248 .collect()
1249 }
1250}