spacetimedb/host/wasmtime/
wasm_instance_env.rs

1#![allow(clippy::too_many_arguments)]
2
3use std::time::Instant;
4
5use crate::database_logger::{BacktraceFrame, BacktraceProvider, ModuleBacktrace, Record};
6use crate::host::instance_env::{ChunkPool, InstanceEnv};
7use crate::host::wasm_common::instrumentation;
8use crate::host::wasm_common::module_host_actor::ExecutionTimings;
9use crate::host::wasm_common::{
10    err_to_errno, instrumentation::CallTimes, AbiRuntimeError, RowIterIdx, RowIters, TimingSpan, TimingSpanIdx,
11    TimingSpanSet,
12};
13use crate::host::AbiCall;
14use anyhow::Context as _;
15use spacetimedb_lib::Timestamp;
16use spacetimedb_primitives::{errno, ColId};
17use wasmtime::{AsContext, Caller, StoreContextMut};
18
19use super::{Mem, MemView, NullableMemOp, WasmError, WasmPointee, WasmPtr};
20
21#[cfg(not(feature = "spacetimedb-wasm-instance-env-times"))]
22use instrumentation::noop as span;
23#[cfg(feature = "spacetimedb-wasm-instance-env-times")]
24use instrumentation::op as span;
25
26/// A `WasmInstanceEnv` provides the connection between a module
27/// and the database.
28///
29/// A `WasmInstanceEnv` associates an `InstanceEnv` (responsible for
30/// the database instance and its associated state) with a wasm
31/// `Mem`. It also contains the resources (`Buffers` and
32/// `BufferIters`) needed to manage the ABI contract between modules
33/// and the host.
34///
35/// Once created, a `WasmInstanceEnv` must be instantiated with a `Mem`
36/// exactly once.
37///
38/// Some of the state associated to a `WasmInstanceEnv` is per reducer invocation.
39/// For instance, module-defined timing spans are per reducer.
40pub(super) struct WasmInstanceEnv {
41    /// The database `InstanceEnv` associated to this instance.
42    instance_env: InstanceEnv,
43
44    /// The `Mem` associated to this instance. At construction time,
45    /// this is always `None`. The `Mem` instance is extracted from the
46    /// instance exports, and after instantiation is complete, this will
47    /// always be `Some`.
48    mem: Option<Mem>,
49
50    /// The arguments being passed to a reducer
51    /// that it can read via [`Self::bytes_source_read`].
52    call_reducer_args: Option<(bytes::Bytes, usize)>,
53
54    /// The standard sink used for [`Self::bytes_sink_write`].
55    standard_bytes_sink: Option<Vec<u8>>,
56
57    /// The slab of `BufferIters` created for this instance.
58    iters: RowIters,
59
60    /// Track time spent in module-defined spans.
61    timing_spans: TimingSpanSet,
62
63    /// The point in time the last reducer call started at.
64    reducer_start: Instant,
65
66    /// Track time spent in all wasm instance env calls (aka syscall time).
67    ///
68    /// Each function, like `insert`, will add the `Duration` spent in it
69    /// to this tracker.
70    call_times: CallTimes,
71
72    /// The last, including current, reducer to be executed by this environment.
73    reducer_name: String,
74    /// A pool of unused allocated chunks that can be reused.
75    // TODO(Centril): consider using this pool for `console_timer_start` and `bytes_sink_write`.
76    chunk_pool: ChunkPool,
77}
78
79const CALL_REDUCER_ARGS_SOURCE: u32 = 1;
80const STANDARD_BYTES_SINK: u32 = 1;
81
82type WasmResult<T> = Result<T, WasmError>;
83type RtResult<T> = anyhow::Result<T>;
84
85/// Wraps an `InstanceEnv` with the magic necessary to push
86/// and pull bytes from webassembly memory.
87impl WasmInstanceEnv {
88    /// Create a new `WasmEnstanceEnv` from the given `InstanceEnv`.
89    pub fn new(instance_env: InstanceEnv) -> Self {
90        let reducer_start = Instant::now();
91        Self {
92            instance_env,
93            mem: None,
94            call_reducer_args: None,
95            standard_bytes_sink: None,
96            iters: Default::default(),
97            timing_spans: Default::default(),
98            reducer_start,
99            call_times: CallTimes::new(),
100            reducer_name: String::from("<initializing>"),
101            chunk_pool: <_>::default(),
102        }
103    }
104
105    /// Finish the instantiation of this instance with the provided `Mem`.
106    pub fn instantiate(&mut self, mem: Mem) {
107        assert!(self.mem.is_none());
108        self.mem = Some(mem);
109    }
110
111    /// Returns a reference to the memory, assumed to be initialized.
112    pub fn get_mem(&self) -> Mem {
113        self.mem.expect("Initialized memory")
114    }
115    fn mem_env<'a>(ctx: impl Into<StoreContextMut<'a, Self>>) -> (&'a mut MemView, &'a mut Self) {
116        let ctx = ctx.into();
117        let mem = ctx.data().get_mem();
118        mem.view_and_store_mut(ctx)
119    }
120
121    /// Return a reference to the `InstanceEnv`,
122    /// which is responsible for DB instance and associated state.
123    pub fn instance_env(&self) -> &InstanceEnv {
124        &self.instance_env
125    }
126
127    /// Setup the standard bytes sink and return a handle to it for writing.
128    pub fn setup_standard_bytes_sink(&mut self) -> u32 {
129        self.standard_bytes_sink = Some(Vec::new());
130        STANDARD_BYTES_SINK
131    }
132
133    /// Extract all the bytes written to the standard bytes sink
134    /// and prevent further writes to it.
135    pub fn take_standard_bytes_sink(&mut self) -> Vec<u8> {
136        self.standard_bytes_sink.take().unwrap_or_default()
137    }
138
139    /// Signal to this `WasmInstanceEnv` that a reducer call is beginning.
140    ///
141    /// Returns the handle used by reducers to read from `args`
142    /// as well as the handle used to write the error message, if any.
143    pub fn start_reducer(&mut self, name: &str, args: bytes::Bytes, ts: Timestamp) -> (u32, u32) {
144        let errors = self.setup_standard_bytes_sink();
145
146        // Pass an invalid source when the reducer args were empty.
147        // This allows the module to avoid allocating and make a system call in those cases.
148        self.call_reducer_args = (!args.is_empty()).then_some((args, 0));
149        let args = if self.call_reducer_args.is_some() {
150            CALL_REDUCER_ARGS_SOURCE
151        } else {
152            0
153        };
154
155        self.reducer_start = Instant::now();
156        name.clone_into(&mut self.reducer_name);
157        self.instance_env.start_reducer(ts);
158
159        (args, errors)
160    }
161
162    /// Returns the name of the most recent reducer to be run in this environment.
163    pub fn reducer_name(&self) -> &str {
164        &self.reducer_name
165    }
166
167    /// Returns the name of the most recent reducer to be run in this environment.
168    pub fn reducer_start(&self) -> Instant {
169        self.reducer_start
170    }
171
172    /// Signal to this `WasmInstanceEnv` that a reducer call is over.
173    /// This resets all of the state associated to a single reducer call,
174    /// and returns instrumentation records.
175    pub fn finish_reducer(&mut self) -> (ExecutionTimings, Vec<u8>) {
176        // For the moment,
177        // we only explicitly clear the source/sink buffers and the "syscall" times.
178        // TODO: should we be clearing `iters` and/or `timing_spans`?
179
180        let total_duration = self.reducer_start.elapsed();
181
182        // Taking the call times record also resets timings to 0s for the next call.
183        let wasm_instance_env_call_times = self.call_times.take();
184
185        let timings = ExecutionTimings {
186            total_duration,
187            wasm_instance_env_call_times,
188        };
189
190        self.call_reducer_args = None;
191        (timings, self.take_standard_bytes_sink())
192    }
193
194    fn with_span<R>(mut caller: Caller<'_, Self>, func: AbiCall, run: impl FnOnce(&mut Caller<'_, Self>) -> R) -> R {
195        let span_start = span::CallSpanStart::new(func);
196
197        // Call `run` with the caller and a handle to the memory.
198        let result = run(&mut caller);
199
200        // Track the span of this call.
201        let span = span_start.end();
202        span::record_span(&mut caller.data_mut().call_times, span);
203
204        result
205    }
206
207    fn convert_wasm_result<T: From<u16>>(func: AbiCall, err: WasmError) -> RtResult<T> {
208        Err(match err {
209            WasmError::Db(err) => match err_to_errno(&err) {
210                Some(errno) => {
211                    log::debug!(
212                        "abi call to {func} returned an errno: {errno} ({})",
213                        errno::strerror(errno).unwrap_or("<unknown>")
214                    );
215                    return Ok(errno.get().into());
216                }
217                None => anyhow::Error::from(AbiRuntimeError { func, err }),
218            },
219            WasmError::BufferTooSmall => return Ok(errno::BUFFER_TOO_SMALL.get().into()),
220            WasmError::Wasm(err) => err,
221        })
222    }
223
224    /// Call the function `run` with the name `func`.
225    /// The function `run` is provided with the callers environment and the host's memory.
226    ///
227    /// One of `cvt_custom`, `cvt`, `cvt_ret`, or `cvt_noret` should be used in the implementation of any
228    /// host call, to provide consistent error handling and instrumentation.
229    ///
230    /// Some database errors are logged but are otherwise regarded as `Ok(_)`.
231    /// See `err_to_errno` for a list.
232    ///
233    /// This variant should be used when more control is needed over the success value.
234    fn cvt_custom<T: From<u16>>(
235        caller: Caller<'_, Self>,
236        func: AbiCall,
237        run: impl FnOnce(&mut Caller<'_, Self>) -> WasmResult<T>,
238    ) -> RtResult<T> {
239        Self::with_span(caller, func, run).or_else(|err| Self::convert_wasm_result(func, err))
240    }
241
242    /// Call the function `run` with the name `func`.
243    /// The function `run` is provided with the callers environment and the host's memory.
244    ///
245    /// One of `cvt`, `cvt_ret`, or `cvt_noret` should be used in the implementation of any
246    /// host call, to provide consistent error handling and instrumentation.
247    ///
248    /// Some database errors are logged but are otherwise regarded as `Ok(_)`.
249    /// See `err_to_errno` for a list.
250    fn cvt<T: From<u16>>(
251        caller: Caller<'_, Self>,
252        func: AbiCall,
253        run: impl FnOnce(&mut Caller<'_, Self>) -> WasmResult<()>,
254    ) -> RtResult<T> {
255        Self::cvt_custom(caller, func, |c| run(c).map(|()| 0u16.into()))
256    }
257
258    /// Call the function `f` with any return value being written to the pointer `out`.
259    ///
260    /// Otherwise, `cvt_ret` (this function) behaves as `cvt`.
261    ///
262    /// One of `cvt`, `cvt_ret`, or `cvt_noret` should be used in the implementation of any
263    /// host call, to provide consistent error handling and instrumentation.
264    ///
265    /// This method should be used as opposed to a manual implementation,
266    /// as it helps with upholding the safety invariants of [`bindings_sys::call`].
267    ///
268    /// Returns an error if writing `T` to `out` errors.
269    fn cvt_ret<O: WasmPointee>(
270        caller: Caller<'_, Self>,
271        call: AbiCall,
272        out: WasmPtr<O>,
273        f: impl FnOnce(&mut Caller<'_, Self>) -> WasmResult<O>,
274    ) -> RtResult<u32> {
275        Self::cvt(caller, call, |caller| {
276            f(caller).and_then(|ret| {
277                let (mem, _) = Self::mem_env(caller);
278                ret.write_to(mem, out).map_err(|e| e.into())
279            })
280        })
281    }
282
283    /// Call the function `f`.
284    ///
285    /// This is the version of `cvt` or `cvt_ret` for functions with no return value.
286    /// One of `cvt`, `cvt_ret`, or `cvt_noret` should be used in the implementation of any
287    /// host call, to provide consistent error handling and instrumentation.
288    fn cvt_noret(caller: Caller<'_, Self>, call: AbiCall, f: impl FnOnce(&mut Caller<'_, Self>)) {
289        Self::with_span(caller, call, f)
290    }
291
292    fn convert_u32_to_col_id(col_id: u32) -> WasmResult<ColId> {
293        let col_id: u16 = col_id
294            .try_into()
295            .context("ABI violation, a `ColId` must be a `u16`")
296            .map_err(WasmError::Wasm)?;
297        Ok(col_id.into())
298    }
299
300    /// Queries the `table_id` associated with the given (table) `name`
301    /// where `name` is the UTF-8 slice in WASM memory at `name_ptr[..name_len]`.
302    ///
303    /// The table id is written into the `out` pointer.
304    ///
305    /// # Traps
306    ///
307    /// Traps if:
308    /// - `name_ptr` is NULL or `name` is not in bounds of WASM memory.
309    /// - `name` is not valid UTF-8.
310    /// - `out` is NULL or `out[..size_of::<TableId>()]` is not in bounds of WASM memory.
311    ///
312    /// # Errors
313    ///
314    /// Returns an error:
315    ///
316    /// - `NOT_IN_TRANSACTION`, when called outside of a transaction.
317    /// - `NO_SUCH_TABLE`, when `name` is not the name of a table.
318    #[tracing::instrument(level = "trace", skip_all)]
319    pub fn table_id_from_name(
320        caller: Caller<'_, Self>,
321        name: WasmPtr<u8>,
322        name_len: u32,
323        out: WasmPtr<u32>,
324    ) -> RtResult<u32> {
325        Self::cvt_ret::<u32>(caller, AbiCall::TableIdFromName, out, |caller| {
326            let (mem, env) = Self::mem_env(caller);
327            // Read the table name from WASM memory.
328            let name = mem.deref_str(name, name_len)?;
329
330            // Query the table id.
331            Ok(env.instance_env.table_id_from_name(name)?.into())
332        })
333    }
334
335    /// Queries the `index_id` associated with the given (index) `name`
336    /// where `name` is the UTF-8 slice in WASM memory at `name_ptr[..name_len]`.
337    ///
338    /// The index id is written into the `out` pointer.
339    ///
340    /// # Traps
341    ///
342    /// Traps if:
343    /// - `name_ptr` is NULL or `name` is not in bounds of WASM memory.
344    /// - `name` is not valid UTF-8.
345    /// - `out` is NULL or `out[..size_of::<IndexId>()]` is not in bounds of WASM memory.
346    ///
347    /// # Errors
348    ///
349    /// Returns an error:
350    ///
351    /// - `NOT_IN_TRANSACTION`, when called outside of a transaction.
352    /// - `NO_SUCH_INDEX`, when `name` is not the name of an index.
353    #[tracing::instrument(level = "trace", skip_all)]
354    pub fn index_id_from_name(
355        caller: Caller<'_, Self>,
356        name: WasmPtr<u8>,
357        name_len: u32,
358        out: WasmPtr<u32>,
359    ) -> RtResult<u32> {
360        Self::cvt_ret::<u32>(caller, AbiCall::IndexIdFromName, out, |caller| {
361            let (mem, env) = Self::mem_env(caller);
362            // Read the index name from WASM memory.
363            let name = mem.deref_str(name, name_len)?;
364
365            // Query the index id.
366            Ok(env.instance_env.index_id_from_name(name)?.into())
367        })
368    }
369
370    /// Writes the number of rows currently in table identified by `table_id` to `out`.
371    ///
372    /// # Traps
373    ///
374    /// Traps if:
375    /// - `out` is NULL or `out[..size_of::<u64>()]` is not in bounds of WASM memory.
376    ///
377    /// # Errors
378    ///
379    /// Returns an error:
380    ///
381    /// - `NOT_IN_TRANSACTION`, when called outside of a transaction.
382    /// - `NO_SUCH_TABLE`, when `table_id` is not a known ID of a table.
383    #[tracing::instrument(level = "trace", skip_all)]
384    pub fn datastore_table_row_count(caller: Caller<'_, Self>, table_id: u32, out: WasmPtr<u64>) -> RtResult<u32> {
385        Self::cvt_ret::<u64>(caller, AbiCall::DatastoreTableRowCount, out, |caller| {
386            let (_, env) = Self::mem_env(caller);
387            Ok(env.instance_env.datastore_table_row_count(table_id.into())?)
388        })
389    }
390
391    /// Starts iteration on each row, as BSATN-encoded, of a table identified by `table_id`.
392    ///
393    /// On success, the iterator handle is written to the `out` pointer.
394    /// This handle can be advanced by [`row_iter_bsatn_advance`].
395    ///
396    /// # Traps
397    ///
398    /// This function does not trap.
399    ///
400    /// # Errors
401    ///
402    /// Returns an error:
403    ///
404    /// - `NOT_IN_TRANSACTION`, when called outside of a transaction.
405    /// - `NO_SUCH_TABLE`, when `table_id` is not a known ID of a table.
406    // #[tracing::instrument(level = "trace", skip_all)]
407    pub fn datastore_table_scan_bsatn(
408        caller: Caller<'_, Self>,
409        table_id: u32,
410        out: WasmPtr<RowIterIdx>,
411    ) -> RtResult<u32> {
412        Self::cvt_ret(caller, AbiCall::DatastoreTableScanBsatn, out, |caller| {
413            let env = caller.data_mut();
414            // Collect the iterator chunks.
415            let chunks = env
416                .instance_env
417                .datastore_table_scan_bsatn_chunks(&mut env.chunk_pool, table_id.into())?;
418            // Register the iterator and get back the index to write to `out`.
419            // Calls to the iterator are done through dynamic dispatch.
420            Ok(env.iters.insert(chunks.into_iter()))
421        })
422    }
423
424    /// Finds all rows in the index identified by `index_id`,
425    /// according to the:
426    /// - `prefix = prefix_ptr[..prefix_len]`,
427    /// - `rstart = rstart_ptr[..rstart_len]`,
428    /// - `rend = rend_ptr[..rend_len]`,
429    ///
430    /// in WASM memory.
431    ///
432    /// The index itself has a schema/type.
433    /// The `prefix` is decoded to the initial `prefix_elems` `AlgebraicType`s
434    /// whereas `rstart` and `rend` are decoded to the `prefix_elems + 1` `AlgebraicType`
435    /// where the `AlgebraicValue`s are wrapped in `Bound`.
436    /// That is, `rstart, rend` are BSATN-encoded `Bound<AlgebraicValue>`s.
437    ///
438    /// Matching is then defined by equating `prefix`
439    /// to the initial `prefix_elems` columns of the index
440    /// and then imposing `rstart` as the starting bound
441    /// and `rend` as the ending bound on the `prefix_elems + 1` column of the index.
442    /// Remaining columns of the index are then unbounded.
443    /// Note that the `prefix` in this case can be empty (`prefix_elems = 0`),
444    /// in which case this becomes a ranged index scan on a single-col index
445    /// or even a full table scan if `rstart` and `rend` are both unbounded.
446    ///
447    /// The relevant table for the index is found implicitly via the `index_id`,
448    /// which is unique for the module.
449    ///
450    /// On success, the iterator handle is written to the `out` pointer.
451    /// This handle can be advanced by [`row_iter_bsatn_advance`].
452    ///
453    /// # Non-obvious queries
454    ///
455    /// For an index on columns `[a, b, c]`:
456    ///
457    /// - `a = x, b = y` is encoded as a prefix `[x, y]`
458    ///   and a range `Range::Unbounded`,
459    ///   or as a  prefix `[x]` and a range `rstart = rend = Range::Inclusive(y)`.
460    /// - `a = x, b = y, c = z` is encoded as a prefix `[x, y]`
461    ///   and a  range `rstart = rend = Range::Inclusive(z)`.
462    /// - A sorted full scan is encoded as an empty prefix
463    ///   and a range `Range::Unbounded`.
464    ///
465    /// # Traps
466    ///
467    /// Traps if:
468    /// - `prefix_elems > 0`
469    ///   and (`prefix_ptr` is NULL or `prefix` is not in bounds of WASM memory).
470    /// - `rstart` is NULL or `rstart` is not in bounds of WASM memory.
471    /// - `rend` is NULL or `rend` is not in bounds of WASM memory.
472    /// - `out` is NULL or `out[..size_of::<RowIter>()]` is not in bounds of WASM memory.
473    ///
474    /// # Errors
475    ///
476    /// Returns an error:
477    ///
478    /// - `NOT_IN_TRANSACTION`, when called outside of a transaction.
479    /// - `NO_SUCH_INDEX`, when `index_id` is not a known ID of an index.
480    /// - `WRONG_INDEX_ALGO` if the index is not a range-scan compatible index.
481    /// - `BSATN_DECODE_ERROR`, when `prefix` cannot be decoded to
482    ///   a `prefix_elems` number of `AlgebraicValue`
483    ///   typed at the initial `prefix_elems` `AlgebraicType`s of the index's key type.
484    ///   Or when `rstart` or `rend` cannot be decoded to an `Bound<AlgebraicValue>`
485    ///   where the inner `AlgebraicValue`s are
486    ///   typed at the `prefix_elems + 1` `AlgebraicType` of the index's key type.
487    pub fn datastore_index_scan_range_bsatn(
488        caller: Caller<'_, Self>,
489        index_id: u32,
490        prefix_ptr: WasmPtr<u8>,
491        prefix_len: u32,
492        prefix_elems: u32,
493        rstart_ptr: WasmPtr<u8>, // Bound<AlgebraicValue>
494        rstart_len: u32,
495        rend_ptr: WasmPtr<u8>, // Bound<AlgebraicValue>
496        rend_len: u32,
497        out: WasmPtr<RowIterIdx>,
498    ) -> RtResult<u32> {
499        Self::cvt_ret(caller, AbiCall::DatastoreIndexScanRangeBsatn, out, |caller| {
500            let prefix_elems = Self::convert_u32_to_col_id(prefix_elems)?;
501
502            let (mem, env) = Self::mem_env(caller);
503            // Read the prefix and range start & end from WASM memory.
504            let prefix = if prefix_elems.idx() == 0 {
505                &[]
506            } else {
507                mem.deref_slice(prefix_ptr, prefix_len)?
508            };
509            let rstart = mem.deref_slice(rstart_ptr, rstart_len)?;
510            let rend = mem.deref_slice(rend_ptr, rend_len)?;
511
512            // Find the relevant rows.
513            let chunks = env.instance_env.datastore_index_scan_range_bsatn_chunks(
514                &mut env.chunk_pool,
515                index_id.into(),
516                prefix,
517                prefix_elems,
518                rstart,
519                rend,
520            )?;
521
522            // Insert the encoded + concatenated rows into a new buffer and return its id.
523            Ok(env.iters.insert(chunks.into_iter()))
524        })
525    }
526
527    /// Deprecated name for [`Self::datastore_index_scan_range_bsatn`].
528    #[deprecated = "use `datastore_index_scan_range_bsatn` instead"]
529    pub fn datastore_btree_scan_bsatn(
530        caller: Caller<'_, Self>,
531        index_id: u32,
532        prefix_ptr: WasmPtr<u8>,
533        prefix_len: u32,
534        prefix_elems: u32,
535        rstart_ptr: WasmPtr<u8>, // Bound<AlgebraicValue>
536        rstart_len: u32,
537        rend_ptr: WasmPtr<u8>, // Bound<AlgebraicValue>
538        rend_len: u32,
539        out: WasmPtr<RowIterIdx>,
540    ) -> RtResult<u32> {
541        Self::datastore_index_scan_range_bsatn(
542            caller,
543            index_id,
544            prefix_ptr,
545            prefix_len,
546            prefix_elems,
547            rstart_ptr,
548            rstart_len,
549            rend_ptr,
550            rend_len,
551            out,
552        )
553    }
554
555    /// Reads rows from the given iterator registered under `iter`.
556    ///
557    /// Takes rows from the iterator
558    /// and stores them in the memory pointed to by `buffer = buffer_ptr[..buffer_len]`,
559    /// encoded in BSATN format.
560    ///
561    /// The `buffer_len = buffer_len_ptr[..size_of::<usize>()]` stores the capacity of `buffer`.
562    /// On success (`0` or `-1` is returned),
563    /// `buffer_len` is set to the combined length of the encoded rows.
564    /// When `-1` is returned, the iterator has been exhausted
565    /// and there are no more rows to read,
566    /// leading to the iterator being immediately destroyed.
567    /// Note that the host is free to reuse allocations in a pool,
568    /// destroying the handle logically does not entail that memory is necessarily reclaimed.
569    ///
570    /// # Traps
571    ///
572    /// Traps if:
573    ///
574    /// - `buffer_len_ptr` is NULL or `buffer_len` is not in bounds of WASM memory.
575    /// - `buffer_ptr` is NULL or `buffer` is not in bounds of WASM memory.
576    ///
577    /// # Errors
578    ///
579    /// Returns an error:
580    ///
581    /// - `NO_SUCH_ITER`, when `iter` is not a valid iterator.
582    /// - `BUFFER_TOO_SMALL`, when there are rows left but they cannot fit in `buffer`.
583    ///   When this occurs, `buffer_len` is set to the size of the next item in the iterator.
584    ///   To make progress, the caller should reallocate the buffer to at least that size and try again.
585    // #[tracing::instrument(level = "trace", skip_all)]
586    pub fn row_iter_bsatn_advance(
587        caller: Caller<'_, Self>,
588        iter: u32,
589        buffer_ptr: WasmPtr<u8>,
590        buffer_len_ptr: WasmPtr<u32>,
591    ) -> RtResult<i32> {
592        let row_iter_idx = RowIterIdx(iter);
593        Self::cvt_custom(caller, AbiCall::RowIterBsatnAdvance, |caller| {
594            let (mem, env) = Self::mem_env(caller);
595
596            // Retrieve the iterator by `row_iter_idx`, or error.
597            let Some(iter) = env.iters.get_mut(row_iter_idx) else {
598                return Ok(errno::NO_SUCH_ITER.get().into());
599            };
600
601            // Read `buffer_len`, i.e., the capacity of `buffer` pointed to by `buffer_ptr`.
602            let buffer_len = u32::read_from(mem, buffer_len_ptr)?;
603            let write_buffer_len = |mem, len| u32::try_from(len).unwrap().write_to(mem, buffer_len_ptr);
604            // Get a mutable view to the `buffer`.
605            let mut buffer = mem.deref_slice_mut(buffer_ptr, buffer_len)?;
606
607            let mut written = 0;
608            // Fill the buffer as much as possible.
609            while let Some(chunk) = iter.as_slice().first() {
610                let Some((buf_chunk, rest)) = buffer.split_at_mut_checked(chunk.len()) else {
611                    // Cannot fit chunk into the buffer,
612                    // either because we already filled it too much,
613                    // or because it is too small.
614                    break;
615                };
616                buf_chunk.copy_from_slice(chunk);
617                written += chunk.len();
618                buffer = rest;
619
620                // Advance the iterator, as we used a chunk.
621                // SAFETY: We peeked one `chunk`, so there must be one at least.
622                let chunk = unsafe { iter.next().unwrap_unchecked() };
623                env.chunk_pool.put(chunk);
624            }
625
626            let ret = match (written, iter.as_slice().first()) {
627                // Nothing was written and the iterator is not exhausted.
628                (0, Some(chunk)) => {
629                    write_buffer_len(mem, chunk.len())?;
630                    return Ok(errno::BUFFER_TOO_SMALL.get().into());
631                }
632                // The iterator is exhausted, destroy it, and tell the caller.
633                (_, None) => {
634                    env.iters.take(row_iter_idx);
635                    -1
636                }
637                // Something was written, but the iterator is not exhausted.
638                (_, Some(_)) => 0,
639            };
640            write_buffer_len(mem, written)?;
641            Ok(ret)
642        })
643    }
644
645    /// Destroys the iterator registered under `iter`.
646    ///
647    /// Once `row_iter_bsatn_close` is called on `iter`, the `iter` is invalid.
648    /// That is, `row_iter_bsatn_close(iter)` the second time will yield `NO_SUCH_ITER`.
649    ///
650    /// # Errors
651    ///
652    /// Returns an error:
653    ///
654    /// - `NO_SUCH_ITER`, when `iter` is not a valid iterator.
655    // #[tracing::instrument(level = "trace", skip_all)]
656    pub fn row_iter_bsatn_close(caller: Caller<'_, Self>, iter: u32) -> RtResult<u32> {
657        let row_iter_idx = RowIterIdx(iter);
658        Self::cvt_custom(caller, AbiCall::RowIterBsatnClose, |caller| {
659            let (_, env) = Self::mem_env(caller);
660
661            // Retrieve the iterator by `row_iter_idx`, or error.
662            Ok(match env.iters.take(row_iter_idx) {
663                None => errno::NO_SUCH_ITER.get().into(),
664                // TODO(Centril): consider putting these into a pool for reuse.
665                Some(_) => 0,
666            })
667        })
668    }
669
670    /// Inserts a row into the table identified by `table_id`,
671    /// where the row is read from the byte string `row = row_ptr[..row_len]` in WASM memory
672    /// where `row_len = row_len_ptr[..size_of::<usize>()]` stores the capacity of `row`.
673    ///
674    /// The byte string `row` must be a BSATN-encoded `ProductValue`
675    /// typed at the table's `ProductType` row-schema.
676    ///
677    /// To handle auto-incrementing columns,
678    /// when the call is successful,
679    /// the `row` is written back to with the generated sequence values.
680    /// These values are written as a BSATN-encoded `pv: ProductValue`.
681    /// Each `v: AlgebraicValue` in `pv` is typed at the sequence's column type.
682    /// The `v`s in `pv` are ordered by the order of the columns, in the schema of the table.
683    /// When the table has no sequences,
684    /// this implies that the `pv`, and thus `row`, will be empty.
685    /// The `row_len` is set to the length of `bsatn(pv)`.
686    ///
687    /// # Traps
688    ///
689    /// Traps if:
690    /// - `row_len_ptr` is NULL or `row_len` is not in bounds of WASM memory.
691    /// - `row_ptr` is NULL or `row` is not in bounds of WASM memory.
692    ///
693    /// # Errors
694    ///
695    /// Returns an error:
696    ///
697    /// - `NOT_IN_TRANSACTION`, when called outside of a transaction.
698    /// - `NO_SUCH_TABLE`, when `table_id` is not a known ID of a table.
699    /// - `BSATN_DECODE_ERROR`, when `row` cannot be decoded to a `ProductValue`.
700    ///   typed at the `ProductType` the table's schema specifies.
701    /// - `UNIQUE_ALREADY_EXISTS`, when inserting `row` would violate a unique constraint.
702    /// - `SCHEDULE_AT_DELAY_TOO_LONG`, when the delay specified in the row was too long.
703    #[tracing::instrument(level = "trace", skip_all)]
704    pub fn datastore_insert_bsatn(
705        caller: Caller<'_, Self>,
706        table_id: u32,
707        row_ptr: WasmPtr<u8>,
708        row_len_ptr: WasmPtr<u32>,
709    ) -> RtResult<u32> {
710        Self::cvt(caller, AbiCall::DatastoreInsertBsatn, |caller| {
711            let (mem, env) = Self::mem_env(caller);
712
713            // Read `row-len`, i.e., the capacity of `row` pointed to by `row_ptr`.
714            let row_len = u32::read_from(mem, row_len_ptr)?;
715            // Get a mutable view to the `row`.
716            let row = mem.deref_slice_mut(row_ptr, row_len)?;
717
718            // Insert the row into the DB and write back the generated column values.
719            let row_len = env.instance_env.insert(table_id.into(), row)?;
720            u32::try_from(row_len).unwrap().write_to(mem, row_len_ptr)?;
721            Ok(())
722        })
723    }
724
725    /// Updates a row in the table identified by `table_id` to `row`
726    /// where the row is read from the byte string `row = row_ptr[..row_len]` in WASM memory
727    /// where `row_len = row_len_ptr[..size_of::<usize>()]` stores the capacity of `row`.
728    ///
729    /// The byte string `row` must be a BSATN-encoded `ProductValue`
730    /// typed at the table's `ProductType` row-schema.
731    ///
732    /// The row to update is found by projecting `row`
733    /// to the type of the *unique* index identified by `index_id`.
734    /// If no row is found, the error `NO_SUCH_ROW` is returned.
735    ///
736    /// To handle auto-incrementing columns,
737    /// when the call is successful,
738    /// the `row` is written back to with the generated sequence values.
739    /// These values are written as a BSATN-encoded `pv: ProductValue`.
740    /// Each `v: AlgebraicValue` in `pv` is typed at the sequence's column type.
741    /// The `v`s in `pv` are ordered by the order of the columns, in the schema of the table.
742    /// When the table has no sequences,
743    /// this implies that the `pv`, and thus `row`, will be empty.
744    /// The `row_len` is set to the length of `bsatn(pv)`.
745    ///
746    /// # Traps
747    ///
748    /// Traps if:
749    /// - `row_len_ptr` is NULL or `row_len` is not in bounds of WASM memory.
750    /// - `row_ptr` is NULL or `row` is not in bounds of WASM memory.
751    ///
752    /// # Errors
753    ///
754    /// Returns an error:
755    ///
756    /// - `NOT_IN_TRANSACTION`, when called outside of a transaction.
757    /// - `NO_SUCH_TABLE`, when `table_id` is not a known ID of a table.
758    /// - `NO_SUCH_INDEX`, when `index_id` is not a known ID of an index.
759    /// - `INDEX_NOT_UNIQUE`, when the index was not unique.
760    /// - `BSATN_DECODE_ERROR`, when `row` cannot be decoded to a `ProductValue`
761    ///    typed at the `ProductType` the table's schema specifies
762    ///    or when it cannot be projected to the index identified by `index_id`.
763    /// - `NO_SUCH_ROW`, when the row was not found in the unique index.
764    /// - `UNIQUE_ALREADY_EXISTS`, when inserting `row` would violate a unique constraint.
765    /// - `SCHEDULE_AT_DELAY_TOO_LONG`, when the delay specified in the row was too long.
766    #[tracing::instrument(level = "trace", skip_all)]
767    pub fn datastore_update_bsatn(
768        caller: Caller<'_, Self>,
769        table_id: u32,
770        index_id: u32,
771        row_ptr: WasmPtr<u8>,
772        row_len_ptr: WasmPtr<u32>,
773    ) -> RtResult<u32> {
774        Self::cvt(caller, AbiCall::DatastoreUpdateBsatn, |caller| {
775            let (mem, env) = Self::mem_env(caller);
776
777            // Read `row-len`, i.e., the capacity of `row` pointed to by `row_ptr`.
778            let row_len = u32::read_from(mem, row_len_ptr)?;
779            // Get a mutable view to the `row`.
780            let row = mem.deref_slice_mut(row_ptr, row_len)?;
781
782            // Update the row in the DB and write back the generated column values.
783            let row_len = env.instance_env.update(table_id.into(), index_id.into(), row)?;
784            u32::try_from(row_len).unwrap().write_to(mem, row_len_ptr)?;
785            Ok(())
786        })
787    }
788
789    /// Deletes all rows found in the index identified by `index_id`,
790    /// according to the:
791    /// - `prefix = prefix_ptr[..prefix_len]`,
792    /// - `rstart = rstart_ptr[..rstart_len]`,
793    /// - `rend = rend_ptr[..rend_len]`,
794    ///
795    /// in WASM memory.
796    ///
797    /// This syscall will delete all the rows found by
798    /// [`datastore_index_scan_range_bsatn`] with the same arguments passed,
799    /// including `prefix_elems`.
800    /// See `datastore_index_scan_range_bsatn` for details.
801    ///
802    /// The number of rows deleted is written to the WASM pointer `out`.
803    ///
804    /// # Traps
805    ///
806    /// Traps if:
807    /// - `prefix_elems > 0`
808    ///   and (`prefix_ptr` is NULL or `prefix` is not in bounds of WASM memory).
809    /// - `rstart` is NULL or `rstart` is not in bounds of WASM memory.
810    /// - `rend` is NULL or `rend` is not in bounds of WASM memory.
811    /// - `out` is NULL or `out[..size_of::<u32>()]` is not in bounds of WASM memory.
812    ///
813    /// # Errors
814    ///
815    /// Returns an error:
816    ///
817    /// - `NOT_IN_TRANSACTION`, when called outside of a transaction.
818    /// - `NO_SUCH_INDEX`, when `index_id` is not a known ID of an index.
819    /// - `WRONG_INDEX_ALGO` if the index is not a range-compatible index.
820    /// - `BSATN_DECODE_ERROR`, when `prefix` cannot be decoded to
821    ///   a `prefix_elems` number of `AlgebraicValue`
822    ///   typed at the initial `prefix_elems` `AlgebraicType`s of the index's key type.
823    ///   Or when `rstart` or `rend` cannot be decoded to an `Bound<AlgebraicValue>`
824    ///   where the inner `AlgebraicValue`s are
825    ///   typed at the `prefix_elems + 1` `AlgebraicType` of the index's key type.
826    pub fn datastore_delete_by_index_scan_range_bsatn(
827        caller: Caller<'_, Self>,
828        index_id: u32,
829        prefix_ptr: WasmPtr<u8>,
830        prefix_len: u32,
831        prefix_elems: u32,
832        rstart_ptr: WasmPtr<u8>, // Bound<AlgebraicValue>
833        rstart_len: u32,
834        rend_ptr: WasmPtr<u8>, // Bound<AlgebraicValue>
835        rend_len: u32,
836        out: WasmPtr<u32>,
837    ) -> RtResult<u32> {
838        Self::cvt_ret(caller, AbiCall::DatastoreDeleteByIndexScanRangeBsatn, out, |caller| {
839            let prefix_elems = Self::convert_u32_to_col_id(prefix_elems)?;
840
841            let (mem, env) = Self::mem_env(caller);
842            // Read the prefix and range start & end from WASM memory.
843            let prefix = if prefix_elems.idx() == 0 {
844                &[]
845            } else {
846                mem.deref_slice(prefix_ptr, prefix_len)?
847            };
848            let rstart = mem.deref_slice(rstart_ptr, rstart_len)?;
849            let rend = mem.deref_slice(rend_ptr, rend_len)?;
850
851            // Delete the relevant rows.
852            Ok(env.instance_env.datastore_delete_by_index_scan_range_bsatn(
853                index_id.into(),
854                prefix,
855                prefix_elems,
856                rstart,
857                rend,
858            )?)
859        })
860    }
861
862    /// Deprecated name for [`Self::datastore_delete_by_index_scan_range_bsatn`].
863    #[deprecated = "use `datastore_delete_by_index_scan_range_bsatn` instead"]
864    pub fn datastore_delete_by_btree_scan_bsatn(
865        caller: Caller<'_, Self>,
866        index_id: u32,
867        prefix_ptr: WasmPtr<u8>,
868        prefix_len: u32,
869        prefix_elems: u32,
870        rstart_ptr: WasmPtr<u8>, // Bound<AlgebraicValue>
871        rstart_len: u32,
872        rend_ptr: WasmPtr<u8>, // Bound<AlgebraicValue>
873        rend_len: u32,
874        out: WasmPtr<u32>,
875    ) -> RtResult<u32> {
876        Self::datastore_delete_by_index_scan_range_bsatn(
877            caller,
878            index_id,
879            prefix_ptr,
880            prefix_len,
881            prefix_elems,
882            rstart_ptr,
883            rstart_len,
884            rend_ptr,
885            rend_len,
886            out,
887        )
888    }
889
890    /// Deletes those rows, in the table identified by `table_id`,
891    /// that match any row in the byte string `rel = rel_ptr[..rel_len]` in WASM memory.
892    ///
893    /// Matching is defined by first BSATN-decoding
894    /// the byte string pointed to at by `relation` to a `Vec<ProductValue>`
895    /// according to the row schema of the table
896    /// and then using `Ord for AlgebraicValue`.
897    /// A match happens when `Ordering::Equal` is returned from `fn cmp`.
898    /// This occurs exactly when the row's BSATN-encoding is equal to the encoding of the `ProductValue`.
899    ///
900    /// The number of rows deleted is written to the WASM pointer `out`.
901    ///
902    /// # Traps
903    ///
904    /// Traps if:
905    /// - `rel_ptr` is NULL or `rel` is not in bounds of WASM memory.
906    /// - `out` is NULL or `out[..size_of::<u32>()]` is not in bounds of WASM memory.
907    ///
908    /// # Errors
909    ///
910    /// Returns an error:
911    ///
912    /// - `NOT_IN_TRANSACTION`, when called outside of a transaction.
913    /// - `NO_SUCH_TABLE`, when `table_id` is not a known ID of a table.
914    /// - `BSATN_DECODE_ERROR`, when `rel` cannot be decoded to `Vec<ProductValue>`
915    ///   where each `ProductValue` is typed at the `ProductType` the table's schema specifies.
916    #[tracing::instrument(level = "trace", skip_all)]
917    pub fn datastore_delete_all_by_eq_bsatn(
918        caller: Caller<'_, Self>,
919        table_id: u32,
920        rel_ptr: WasmPtr<u8>,
921        rel_len: u32,
922        out: WasmPtr<u32>,
923    ) -> RtResult<u32> {
924        Self::cvt_ret(caller, AbiCall::DatastoreDeleteAllByEqBsatn, out, |caller| {
925            let (mem, env) = Self::mem_env(caller);
926            let relation = mem.deref_slice(rel_ptr, rel_len)?;
927            Ok(env
928                .instance_env
929                .datastore_delete_all_by_eq_bsatn(table_id.into(), relation)?)
930        })
931    }
932
933    pub fn volatile_nonatomic_schedule_immediate(
934        caller: Caller<'_, Self>,
935        name: WasmPtr<u8>,
936        name_len: u32,
937        args: WasmPtr<u8>,
938        args_len: u32,
939    ) -> RtResult<()> {
940        Self::with_span(caller, AbiCall::VolatileNonatomicScheduleImmediate, |caller| {
941            let (mem, env) = Self::mem_env(caller);
942            let name = mem.deref_str(name, name_len)?;
943            let args = mem.deref_slice(args, args_len)?;
944            env.instance_env.scheduler.volatile_nonatomic_schedule_immediate(
945                name.to_owned(),
946                crate::host::ReducerArgs::Bsatn(args.to_vec().into()),
947            );
948
949            Ok(())
950        })
951    }
952
953    /// Reads bytes from `source`, registered in the host environment,
954    /// and stores them in the memory pointed to by `buffer = buffer_ptr[..buffer_len]`.
955    ///
956    /// The `buffer_len = buffer_len_ptr[..size_of::<usize>()]` stores the capacity of `buffer`.
957    /// On success (`0` or `-1` is returned),
958    /// `buffer_len` is set to the number of bytes written to `buffer`.
959    /// When `-1` is returned, the resource has been exhausted
960    /// and there are no more bytes to read,
961    /// leading to the resource being immediately destroyed.
962    /// Note that the host is free to reuse allocations in a pool,
963    /// destroying the handle logically does not entail that memory is necessarily reclaimed.
964    ///
965    /// # Traps
966    ///
967    /// Traps if:
968    ///
969    /// - `buffer_len_ptr` is NULL or `buffer_len` is not in bounds of WASM memory.
970    /// - `buffer_ptr` is NULL or `buffer` is not in bounds of WASM memory.
971    ///
972    /// # Errors
973    ///
974    /// Returns an error:
975    ///
976    /// - `NO_SUCH_BYTES`, when `source` is not a valid bytes source.
977    ///
978    /// # Example
979    ///
980    /// The typical use case for this ABI is in `__call_reducer__`,
981    /// to read and deserialize the `args`.
982    /// An example definition, dealing with `args` might be:
983    /// ```rust,ignore
984    /// /// #[no_mangle]
985    /// extern "C" fn __call_reducer__(..., args: BytesSource, ...) -> i16 {
986    ///     // ...
987    ///
988    ///     let mut buf = Vec::<u8>::with_capacity(1024);
989    ///     loop {
990    ///         // Write into the spare capacity of the buffer.
991    ///         let buf_ptr = buf.spare_capacity_mut();
992    ///         let spare_len = buf_ptr.len();
993    ///         let mut buf_len = buf_ptr.len();
994    ///         let buf_ptr = buf_ptr.as_mut_ptr().cast();
995    ///         let ret = unsafe { bytes_source_read(args, buf_ptr, &mut buf_len) };
996    ///         // SAFETY: `bytes_source_read` just appended `spare_len` bytes to `buf`.
997    ///         unsafe { buf.set_len(buf.len() + spare_len) };
998    ///         match ret {
999    ///             // Host side source exhausted, we're done.
1000    ///             -1 => break,
1001    ///             // Wrote the entire spare capacity.
1002    ///             // Need to reserve more space in the buffer.
1003    ///             0 if spare_len == buf_len => buf.reserve(1024),
1004    ///             // Host didn't write as much as possible.
1005    ///             // Try to read some more.
1006    ///             // The host will likely not trigger this branch,
1007    ///             // but a module should be prepared for it.
1008    ///             0 => {}
1009    ///             _ => unreachable!(),
1010    ///         }
1011    ///     }
1012    ///
1013    ///     // ...
1014    /// }
1015    /// ```
1016    pub fn bytes_source_read(
1017        caller: Caller<'_, Self>,
1018        source: u32,
1019        buffer_ptr: WasmPtr<u8>,
1020        buffer_len_ptr: WasmPtr<u32>,
1021    ) -> RtResult<i32> {
1022        Self::cvt_custom(caller, AbiCall::BytesSourceRead, |caller| {
1023            let (mem, env) = Self::mem_env(caller);
1024
1025            // Retrieve the reducer args if available and requested, or error.
1026            let Some((reducer_args, cursor)) = env
1027                .call_reducer_args
1028                .as_mut()
1029                .filter(|_| source == CALL_REDUCER_ARGS_SOURCE)
1030            else {
1031                return Ok(errno::NO_SUCH_BYTES.get().into());
1032            };
1033
1034            // Read `buffer_len`, i.e., the capacity of `buffer` pointed to by `buffer_ptr`.
1035            let buffer_len = u32::read_from(mem, buffer_len_ptr)?;
1036            // Get a mutable view to the `buffer`.
1037            let buffer = mem.deref_slice_mut(buffer_ptr, buffer_len)?;
1038            let buffer_len = buffer_len as usize;
1039
1040            // Derive the portion that we can read and what remains,
1041            // based on what is left to read and the capacity.
1042            let left_to_read = &reducer_args[*cursor..];
1043            let can_read_len = buffer_len.min(left_to_read.len());
1044            let (can_read, remainder) = left_to_read.split_at(can_read_len);
1045            // Copy to the `buffer` and write written bytes count to `buffer_len`.
1046            buffer[..can_read_len].copy_from_slice(can_read);
1047            (can_read_len as u32).write_to(mem, buffer_len_ptr)?;
1048
1049            // Destroy the source if exhausted, or advance `cursor`.
1050            if remainder.is_empty() {
1051                env.call_reducer_args = None;
1052                Ok(-1i32)
1053            } else {
1054                *cursor += can_read_len;
1055                Ok(0)
1056            }
1057        })
1058    }
1059
1060    /// Writes up to `buffer_len` bytes from `buffer = buffer_ptr[..buffer_len]`,
1061    /// to the `sink`, registered in the host environment.
1062    ///
1063    /// The `buffer_len = buffer_len_ptr[..size_of::<usize>()]` stores the capacity of `buffer`.
1064    /// On success (`0` is returned),
1065    /// `buffer_len` is set to the number of bytes written to `sink`.
1066    ///
1067    /// # Traps
1068    ///
1069    /// - `buffer_len_ptr` is NULL or `buffer_len` is not in bounds of WASM memory.
1070    /// - `buffer_ptr` is NULL or `buffer` is not in bounds of WASM memory.
1071    ///
1072    /// # Errors
1073    ///
1074    /// Returns an error:
1075    ///
1076    /// - `NO_SUCH_BYTES`, when `sink` is not a valid bytes sink.
1077    /// - `NO_SPACE`, when there is no room for more bytes in `sink`.
1078    ///   (Doesn't currently happen.)
1079    pub fn bytes_sink_write(
1080        caller: Caller<'_, Self>,
1081        sink: u32,
1082        buffer_ptr: WasmPtr<u8>,
1083        buffer_len_ptr: WasmPtr<u32>,
1084    ) -> RtResult<u32> {
1085        Self::cvt_custom(caller, AbiCall::BytesSinkWrite, |caller| {
1086            let (mem, env) = Self::mem_env(caller);
1087
1088            // Retrieve the reducer args if available and requested, or error.
1089            let Some(sink) = env.standard_bytes_sink.as_mut().filter(|_| sink == STANDARD_BYTES_SINK) else {
1090                return Ok(errno::NO_SUCH_BYTES.get().into());
1091            };
1092
1093            // Read `buffer_len`, i.e., the capacity of `buffer` pointed to by `buffer_ptr`.
1094            let buffer_len = u32::read_from(mem, buffer_len_ptr)?;
1095            // Write `buffer` to `sink`.
1096            let buffer = mem.deref_slice(buffer_ptr, buffer_len)?;
1097            sink.extend(buffer);
1098
1099            Ok(0)
1100        })
1101    }
1102
1103    /// Logs at `level` a `message` message occurring in `filename:line_number`
1104    /// with [`target`](target) being the module path at the `log!` invocation site.
1105    ///
1106    /// These various pointers are interpreted lossily as UTF-8 strings with a corresponding `_len`.
1107    ///
1108    /// The `target` and `filename` pointers are ignored by passing `NULL`.
1109    /// The line number is ignored if `line_number == u32::MAX`.
1110    ///
1111    /// No message is logged if
1112    /// - `target != NULL && target + target_len > u64::MAX`
1113    /// - `filename != NULL && filename + filename_len > u64::MAX`
1114    /// - `message + message_len > u64::MAX`
1115    ///
1116    /// # Traps
1117    ///
1118    /// Traps if:
1119    /// - `target` is not NULL and `target_ptr[..target_len]` is not in bounds of WASM memory.
1120    /// - `filename` is not NULL and `filename_ptr[..filename_len]` is not in bounds of WASM memory.
1121    /// - `message` is not NULL and `message_ptr[..message_len]` is not in bounds of WASM memory.
1122    ///
1123    /// [target]: https://docs.rs/log/latest/log/struct.Record.html#method.target
1124    #[tracing::instrument(level = "trace", skip_all)]
1125    pub fn console_log(
1126        caller: Caller<'_, Self>,
1127        level: u32,
1128        target_ptr: WasmPtr<u8>,
1129        target_len: u32,
1130        filename_ptr: WasmPtr<u8>,
1131        filename_len: u32,
1132        line_number: u32,
1133        message_ptr: WasmPtr<u8>,
1134        message_len: u32,
1135    ) {
1136        let do_console_log = |caller: &mut Caller<'_, Self>| -> WasmResult<()> {
1137            let env = caller.data();
1138            let mem = env.get_mem().view(&caller);
1139
1140            // Read the `target`, `filename`, and `message` strings from WASM memory.
1141            let target = mem.deref_str_lossy(target_ptr, target_len).check_nullptr()?;
1142            let filename = mem.deref_str_lossy(filename_ptr, filename_len).check_nullptr()?;
1143            let message = mem.deref_str_lossy(message_ptr, message_len)?;
1144
1145            // The line number cannot be `u32::MAX` as this represents `Option::None`.
1146            let line_number = (line_number != u32::MAX).then_some(line_number);
1147
1148            let record = Record {
1149                // TODO: figure out whether to use walltime now or logical reducer now (env.reducer_start)
1150                ts: chrono::Utc::now(),
1151                target: target.as_deref(),
1152                filename: filename.as_deref(),
1153                line_number,
1154                message: &message,
1155            };
1156
1157            // Write the log record to the `DatabaseLogger` in the database instance context (replica_ctx).
1158            env.instance_env
1159                .console_log((level as u8).into(), &record, &caller.as_context());
1160            Ok(())
1161        };
1162        Self::cvt_noret(caller, AbiCall::ConsoleLog, |caller| {
1163            let _ = do_console_log(caller);
1164        })
1165    }
1166
1167    /// Begins a timing span with `name = name_ptr[..name_len]`.
1168    ///
1169    /// When the returned `ConsoleTimerId` is passed to [`console_timer_end`],
1170    /// the duration between the calls will be printed to the module's logs.
1171    ///
1172    /// The `name` is interpreted lossily as a UTF-8 string.
1173    ///
1174    /// # Traps
1175    ///
1176    /// Traps if:
1177    /// - `name_ptr` is NULL or `name` is not in bounds of WASM memory.
1178    pub fn console_timer_start(caller: Caller<'_, Self>, name_ptr: WasmPtr<u8>, name_len: u32) -> RtResult<u32> {
1179        Self::with_span(caller, AbiCall::ConsoleTimerStart, |caller| {
1180            let (mem, env) = Self::mem_env(caller);
1181            let name = mem.deref_str_lossy(name_ptr, name_len)?.into_owned();
1182            Ok(env.timing_spans.insert(TimingSpan::new(name)).0)
1183        })
1184    }
1185
1186    pub fn console_timer_end(caller: Caller<'_, Self>, span_id: u32) -> RtResult<u32> {
1187        Self::cvt_custom(caller, AbiCall::ConsoleTimerEnd, |caller| {
1188            let Some(span) = caller.data_mut().timing_spans.take(TimingSpanIdx(span_id)) else {
1189                return Ok(errno::NO_SUCH_CONSOLE_TIMER.get().into());
1190            };
1191
1192            let elapsed = span.start.elapsed();
1193            let message = format!("Timing span {:?}: {:?}", &span.name, elapsed);
1194
1195            let record = Record {
1196                ts: chrono::Utc::now(),
1197                target: None,
1198                filename: None,
1199                line_number: None,
1200                message: &message,
1201            };
1202            caller.data().instance_env.console_log(
1203                crate::database_logger::LogLevel::Info,
1204                &record,
1205                &caller.as_context(),
1206            );
1207            Ok(0)
1208        })
1209    }
1210
1211    /// Writes the identity of the module into `out = out_ptr[..32]`.
1212    ///
1213    /// # Traps
1214    ///
1215    /// Traps if:
1216    ///
1217    /// - `out_ptr` is NULL or `out` is not in bounds of WASM memory.
1218    pub fn identity(caller: Caller<'_, Self>, out_ptr: WasmPtr<u8>) -> RtResult<()> {
1219        // Use `with_span` rather than one of the `cvt_*` functions,
1220        // as we want to possibly trap, but not to return an error code.
1221        Self::with_span(caller, AbiCall::Identity, |caller| {
1222            let (mem, env) = Self::mem_env(caller);
1223            let identity = env.instance_env.replica_ctx.database.database_identity;
1224            // We're implicitly casting `out_ptr` to `WasmPtr<Identity>` here.
1225            // (Both types are actually `u32`.)
1226            // This works because `Identity::write_to` does not require an aligned pointer,
1227            // as it gets a `&mut [u8]` from WASM memory and does `copy_from_slice` with it.
1228            identity.write_to(mem, out_ptr)?;
1229            Ok(())
1230        })
1231    }
1232}
1233
1234impl<T> BacktraceProvider for wasmtime::StoreContext<'_, T> {
1235    fn capture(&self) -> Box<dyn ModuleBacktrace> {
1236        Box::new(wasmtime::WasmBacktrace::capture(self))
1237    }
1238}
1239
1240impl ModuleBacktrace for wasmtime::WasmBacktrace {
1241    fn frames(&self) -> Vec<BacktraceFrame<'_>> {
1242        self.frames()
1243            .iter()
1244            .map(|f| BacktraceFrame {
1245                module_name: None,
1246                func_name: f.func_name(),
1247            })
1248            .collect()
1249    }
1250}