spacetimedb/
execution_context.rs1use std::sync::Arc;
2
3use bytes::Bytes;
4use derive_more::Display;
5use spacetimedb_commitlog::{payload::txdata, Varchar};
6use spacetimedb_lib::{ConnectionId, Identity, Timestamp};
7use spacetimedb_sats::bsatn;
8
9#[derive(Clone)]
13pub struct ExecutionContext {
14 pub database_identity: Identity,
16 pub reducer: Option<ReducerContext>,
18 pub workload: WorkloadType,
20}
21
22#[derive(Clone)]
26pub struct ReducerContext {
27 pub name: String,
29 pub caller_identity: Identity,
31 pub caller_connection_id: ConnectionId,
33 pub timestamp: Timestamp,
35 pub arg_bsatn: Bytes,
40}
41
42impl From<&ReducerContext> for txdata::Inputs {
43 fn from(
44 ReducerContext {
45 name,
46 caller_identity,
47 caller_connection_id,
48 timestamp,
49 arg_bsatn,
50 }: &ReducerContext,
51 ) -> Self {
52 let reducer_name = Arc::new(Varchar::from_str_truncate(name));
53 let cap = arg_bsatn.len()
54 + 32
56 + 16
58 + 8;
60 let mut buf = Vec::with_capacity(cap);
61 bsatn::to_writer(&mut buf, caller_identity).unwrap();
62 bsatn::to_writer(&mut buf, caller_connection_id).unwrap();
63 bsatn::to_writer(&mut buf, timestamp).unwrap();
64 buf.extend_from_slice(arg_bsatn);
65
66 txdata::Inputs {
67 reducer_name,
68 reducer_args: buf.into(),
69 }
70 }
71}
72
73impl TryFrom<&txdata::Inputs> for ReducerContext {
74 type Error = bsatn::DecodeError;
75
76 fn try_from(inputs: &txdata::Inputs) -> Result<Self, Self::Error> {
77 let args = &mut inputs.reducer_args.as_ref();
78 let caller_identity = bsatn::from_reader(args)?;
79 let caller_connection_id = bsatn::from_reader(args)?;
80 let timestamp = bsatn::from_reader(args)?;
81
82 Ok(Self {
83 name: inputs.reducer_name.to_string(),
84 caller_identity,
85 caller_connection_id,
86 timestamp,
87 arg_bsatn: Bytes::from(args.to_owned()),
88 })
89 }
90}
91
92#[derive(Clone)]
96pub enum Workload {
97 #[cfg(any(test, feature = "test"))]
98 ForTests,
99 Reducer(ReducerContext),
100 Sql,
101 Subscribe,
102 Unsubscribe,
103 Update,
104 Internal,
105}
106
107impl Workload {
108 pub fn workload_type(&self) -> WorkloadType {
109 match self {
110 #[cfg(any(test, feature = "test"))]
111 Self::ForTests => WorkloadType::Internal,
112 Self::Reducer(_) => WorkloadType::Reducer,
113 Self::Sql => WorkloadType::Sql,
114 Self::Subscribe => WorkloadType::Subscribe,
115 Self::Unsubscribe => WorkloadType::Unsubscribe,
116 Self::Update => WorkloadType::Update,
117 Self::Internal => WorkloadType::Internal,
118 }
119 }
120}
121
122#[derive(Clone, Copy, Display, Hash, PartialEq, Eq, strum::AsRefStr, enum_map::Enum)]
127pub enum WorkloadType {
128 Reducer,
129 Sql,
130 Subscribe,
131 Unsubscribe,
132 Update,
133 Internal,
134}
135
136impl Default for WorkloadType {
137 fn default() -> Self {
138 Self::Internal
139 }
140}
141
142impl ExecutionContext {
143 fn new(database_identity: Identity, reducer: Option<ReducerContext>, workload: WorkloadType) -> Self {
145 Self {
146 database_identity,
147 reducer,
148 workload,
149 }
150 }
151
152 pub(crate) fn with_workload(database: Identity, workload: Workload) -> Self {
154 match workload {
155 #[cfg(any(test, feature = "test"))]
156 Workload::ForTests => Self::new(database, None, WorkloadType::Internal),
157 Workload::Internal => Self::new(database, None, WorkloadType::Internal),
158 Workload::Reducer(ctx) => Self::new(database, Some(ctx), WorkloadType::Reducer),
159 Workload::Sql => Self::new(database, None, WorkloadType::Sql),
160 Workload::Subscribe => Self::new(database, None, WorkloadType::Subscribe),
161 Workload::Unsubscribe => Self::new(database, None, WorkloadType::Unsubscribe),
162 Workload::Update => Self::new(database, None, WorkloadType::Update),
163 }
164 }
165
166 #[inline]
168 pub fn database_identity(&self) -> Identity {
169 self.database_identity
170 }
171
172 #[inline]
174 pub fn reducer_name(&self) -> &str {
175 self.reducer.as_ref().map(|ctx| ctx.name.as_str()).unwrap_or_default()
176 }
177
178 #[inline]
180 pub fn into_reducer_name(self) -> String {
181 self.reducer.map(|ctx| ctx.name).unwrap_or_default()
182 }
183
184 #[inline]
186 pub fn reducer_context(&self) -> Option<&ReducerContext> {
187 self.reducer.as_ref()
188 }
189
190 #[inline]
192 pub fn workload(&self) -> WorkloadType {
193 self.workload
194 }
195}