spacetimedb/
execution_context.rs

1use std::sync::Arc;
2
3use bytes::Bytes;
4use derive_more::Display;
5use spacetimedb_commitlog::{payload::txdata, Varchar};
6use spacetimedb_lib::{ConnectionId, Identity, Timestamp};
7use spacetimedb_sats::bsatn;
8
9/// Represents the context under which a database runtime method is executed.
10/// In particular it provides details about the currently executing txn to runtime operations.
11/// More generally it acts as a container for information that database operations may require to function correctly.
12#[derive(Clone)]
13pub struct ExecutionContext {
14    /// The identity of the database on which a transaction is being executed.
15    pub database_identity: Identity,
16    /// The reducer from which the current transaction originated.
17    pub reducer: Option<ReducerContext>,
18    /// The type of workload that is being executed.
19    pub workload: WorkloadType,
20}
21
22/// If an [`ExecutionContext`] is a reducer context, describes the reducer.
23///
24/// Note that this information is written to persistent storage.
25#[derive(Clone)]
26pub struct ReducerContext {
27    /// The name of the reducer.
28    pub name: String,
29    /// The [`Identity`] of the caller.
30    pub caller_identity: Identity,
31    /// The [`ConnectionId`] of the caller.
32    pub caller_connection_id: ConnectionId,
33    /// The timestamp of the reducer invocation.
34    pub timestamp: Timestamp,
35    /// The BSATN-encoded arguments given to the reducer.
36    ///
37    /// Note that [`Bytes`] is a refcounted value, but the memory it points to
38    /// can be large-ish. The reference should be freed as soon as possible.
39    pub arg_bsatn: Bytes,
40}
41
42impl From<&ReducerContext> for txdata::Inputs {
43    fn from(
44        ReducerContext {
45            name,
46            caller_identity,
47            caller_connection_id,
48            timestamp,
49            arg_bsatn,
50        }: &ReducerContext,
51    ) -> Self {
52        let reducer_name = Arc::new(Varchar::from_str_truncate(name));
53        let cap = arg_bsatn.len()
54        /* caller_identity */
55        + 32
56        /* caller_connection_id */
57        + 16
58        /* timestamp */
59        + 8;
60        let mut buf = Vec::with_capacity(cap);
61        bsatn::to_writer(&mut buf, caller_identity).unwrap();
62        bsatn::to_writer(&mut buf, caller_connection_id).unwrap();
63        bsatn::to_writer(&mut buf, timestamp).unwrap();
64        buf.extend_from_slice(arg_bsatn);
65
66        txdata::Inputs {
67            reducer_name,
68            reducer_args: buf.into(),
69        }
70    }
71}
72
73impl TryFrom<&txdata::Inputs> for ReducerContext {
74    type Error = bsatn::DecodeError;
75
76    fn try_from(inputs: &txdata::Inputs) -> Result<Self, Self::Error> {
77        let args = &mut inputs.reducer_args.as_ref();
78        let caller_identity = bsatn::from_reader(args)?;
79        let caller_connection_id = bsatn::from_reader(args)?;
80        let timestamp = bsatn::from_reader(args)?;
81
82        Ok(Self {
83            name: inputs.reducer_name.to_string(),
84            caller_identity,
85            caller_connection_id,
86            timestamp,
87            arg_bsatn: Bytes::from(args.to_owned()),
88        })
89    }
90}
91
92/// Represents the type of workload that is being executed.
93///
94/// Used as constructor helper for [ExecutionContext].
95#[derive(Clone)]
96pub enum Workload {
97    #[cfg(any(test, feature = "test"))]
98    ForTests,
99    Reducer(ReducerContext),
100    Sql,
101    Subscribe,
102    Unsubscribe,
103    Update,
104    Internal,
105}
106
107impl Workload {
108    pub fn workload_type(&self) -> WorkloadType {
109        match self {
110            #[cfg(any(test, feature = "test"))]
111            Self::ForTests => WorkloadType::Internal,
112            Self::Reducer(_) => WorkloadType::Reducer,
113            Self::Sql => WorkloadType::Sql,
114            Self::Subscribe => WorkloadType::Subscribe,
115            Self::Unsubscribe => WorkloadType::Unsubscribe,
116            Self::Update => WorkloadType::Update,
117            Self::Internal => WorkloadType::Internal,
118        }
119    }
120}
121
122/// Classifies a transaction according to its workload.
123/// A transaction can be executing a reducer.
124/// It can be used to satisfy a one-off sql query or subscription.
125/// It can also be an internal operation that is not associated with a reducer or sql request.
126#[derive(Clone, Copy, Display, Hash, PartialEq, Eq, strum::AsRefStr, enum_map::Enum)]
127pub enum WorkloadType {
128    Reducer,
129    Sql,
130    Subscribe,
131    Unsubscribe,
132    Update,
133    Internal,
134}
135
136impl Default for WorkloadType {
137    fn default() -> Self {
138        Self::Internal
139    }
140}
141
142impl ExecutionContext {
143    /// Returns an [ExecutionContext] with the provided parameters and empty metrics.
144    fn new(database_identity: Identity, reducer: Option<ReducerContext>, workload: WorkloadType) -> Self {
145        Self {
146            database_identity,
147            reducer,
148            workload,
149        }
150    }
151
152    /// Returns an [ExecutionContext] with the provided [Workload] and empty metrics.
153    pub(crate) fn with_workload(database: Identity, workload: Workload) -> Self {
154        match workload {
155            #[cfg(any(test, feature = "test"))]
156            Workload::ForTests => Self::new(database, None, WorkloadType::Internal),
157            Workload::Internal => Self::new(database, None, WorkloadType::Internal),
158            Workload::Reducer(ctx) => Self::new(database, Some(ctx), WorkloadType::Reducer),
159            Workload::Sql => Self::new(database, None, WorkloadType::Sql),
160            Workload::Subscribe => Self::new(database, None, WorkloadType::Subscribe),
161            Workload::Unsubscribe => Self::new(database, None, WorkloadType::Unsubscribe),
162            Workload::Update => Self::new(database, None, WorkloadType::Update),
163        }
164    }
165
166    /// Returns the identity of the database on which we are operating.
167    #[inline]
168    pub fn database_identity(&self) -> Identity {
169        self.database_identity
170    }
171
172    /// If this is a reducer context, returns the name of the reducer.
173    #[inline]
174    pub fn reducer_name(&self) -> &str {
175        self.reducer.as_ref().map(|ctx| ctx.name.as_str()).unwrap_or_default()
176    }
177
178    /// If this is a reducer context, returns the name of the reducer.
179    #[inline]
180    pub fn into_reducer_name(self) -> String {
181        self.reducer.map(|ctx| ctx.name).unwrap_or_default()
182    }
183
184    /// If this is a reducer context, returns the full reducer metadata.
185    #[inline]
186    pub fn reducer_context(&self) -> Option<&ReducerContext> {
187        self.reducer.as_ref()
188    }
189
190    /// Returns the type of workload that is being executed.
191    #[inline]
192    pub fn workload(&self) -> WorkloadType {
193        self.workload
194    }
195}