1use serde::{Deserialize, Serialize};
8use serde_json::{json, Value as Json};
9
10pub const VM_STATE_SCHEMA_VERSION: &str = "vm_state.v1";
12
13#[must_use]
14pub fn default_vm_state_schema_version() -> String {
15 VM_STATE_SCHEMA_VERSION.to_string()
16}
17
18#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
20pub struct CompatibilityMeta {
21 pub family: String,
23 pub version: String,
25 pub backward_compatible_from: Vec<String>,
27}
28
29impl Default for CompatibilityMeta {
30 fn default() -> Self {
31 Self {
32 family: "vm_state".to_string(),
33 version: VM_STATE_SCHEMA_VERSION.to_string(),
34 backward_compatible_from: vec!["vm_state.v0".to_string()],
35 }
36 }
37}
38
39#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
41pub struct EndpointRef {
42 pub sid: u64,
43 pub role: String,
44}
45
46#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
48pub struct SessionView {
49 pub sid: u64,
50 pub roles: Vec<String>,
51 pub status: String,
52 pub epoch: u64,
53}
54
55#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
57#[serde(bound(deserialize = "G: Deserialize<'de>, E: Deserialize<'de>"))]
58pub struct CoroutineState<G, E> {
59 pub id: u64,
60 #[serde(rename = "programId", alias = "program_id")]
61 pub program_id: u64,
62 pub pc: u64,
63 pub status: G,
64 #[serde(rename = "ownedEndpoints", alias = "owned_endpoints")]
65 pub owned_endpoints: Vec<EndpointRef>,
66 #[serde(rename = "costBudget", alias = "cost_budget")]
67 pub cost_budget: u64,
68 #[serde(rename = "effectCtx", alias = "effect_ctx")]
69 #[serde(default, skip_serializing_if = "Option::is_none")]
70 pub effect_ctx: Option<E>,
71}
72
73#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
75#[serde(bound(deserialize = "E: Deserialize<'de>"))]
76pub struct TickedObsEvent<E> {
77 pub tick: u64,
78 pub event: E,
79}
80
81#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
83#[serde(bound(deserialize = "G: Deserialize<'de>, E: Deserialize<'de>"))]
84pub struct VMState<G, E> {
85 #[serde(default = "default_vm_state_schema_version")]
86 pub schema_version: String,
87 #[serde(default)]
88 pub compatibility: CompatibilityMeta,
89 pub clock: u64,
90 #[serde(rename = "nextCoroId", alias = "next_coro_id")]
91 pub next_coro_id: u64,
92 #[serde(rename = "nextSessionId", alias = "next_session_id")]
93 pub next_session_id: u64,
94 pub coroutines: Vec<CoroutineState<G, E>>,
95 pub sessions: Vec<SessionView>,
96 #[serde(rename = "obsTrace", alias = "obs_trace")]
97 pub obs_trace: Vec<TickedObsEvent<E>>,
98}
99
100pub fn vm_state_to_json<G, E>(vm: &VMState<G, E>) -> Result<Json, serde_json::Error>
102where
103 G: Serialize,
104 E: Serialize,
105{
106 let coroutines: Vec<Json> = vm
107 .coroutines
108 .iter()
109 .map(coroutine_to_json)
110 .collect::<Result<_, _>>()?;
111 let obs_trace: Vec<Json> = vm
112 .obs_trace
113 .iter()
114 .map(event_to_json)
115 .collect::<Result<_, _>>()?;
116
117 Ok(json!({
118 "schema_version": VM_STATE_SCHEMA_VERSION,
119 "compatibility": vm.compatibility,
120 "clock": vm.clock,
121 "nextCoroId": vm.next_coro_id,
122 "nextSessionId": vm.next_session_id,
123 "coroutines": coroutines,
124 "sessions": sessions_to_json(&vm.sessions),
125 "obsTrace": obs_trace,
126 }))
127}
128
129pub fn vm_state_from_json<G, E>(value: Json) -> Result<VMState<G, E>, serde_json::Error>
131where
132 G: for<'de> Deserialize<'de>,
133 E: for<'de> Deserialize<'de>,
134{
135 serde_json::from_value(value)
136}
137
138pub fn coroutine_to_json<G, E>(coro: &CoroutineState<G, E>) -> Result<Json, serde_json::Error>
140where
141 G: Serialize,
142 E: Serialize,
143{
144 let effect_ctx = match &coro.effect_ctx {
145 Some(ctx) => serde_json::to_value(ctx)?,
146 None => Json::Null,
147 };
148 Ok(json!({
149 "id": coro.id,
150 "programId": coro.program_id,
151 "pc": coro.pc,
152 "status": status_to_json(&coro.status)?,
153 "ownedEndpoints": coro.owned_endpoints.iter().map(endpoint_to_json).collect::<Vec<_>>(),
154 "costBudget": coro.cost_budget,
155 "effectCtx": effect_ctx,
156 }))
157}
158
159pub fn event_to_json<E>(event: &TickedObsEvent<E>) -> Result<Json, serde_json::Error>
161where
162 E: Serialize,
163{
164 Ok(json!({
165 "schema_version": VM_STATE_SCHEMA_VERSION,
166 "tick": event.tick,
167 "event": obs_event_to_json(&event.event)?,
168 }))
169}
170
171pub fn status_to_json<S>(status: &S) -> Result<Json, serde_json::Error>
173where
174 S: Serialize,
175{
176 serde_json::to_value(status)
177}
178
179#[must_use]
181pub fn sessions_to_json(sessions: &[SessionView]) -> Json {
182 Json::Array(
183 sessions
184 .iter()
185 .map(|s| {
186 json!({
187 "sid": s.sid,
188 "roles": s.roles,
189 "status": s.status,
190 "epoch": s.epoch,
191 })
192 })
193 .collect(),
194 )
195}
196
197#[must_use]
199pub fn endpoint_to_json(endpoint: &EndpointRef) -> Json {
200 json!({
201 "sid": endpoint.sid,
202 "role": endpoint.role,
203 })
204}
205
206pub fn obs_event_to_json<E>(event: &E) -> Result<Json, serde_json::Error>
208where
209 E: Serialize,
210{
211 serde_json::to_value(event)
212}
213
214#[cfg(test)]
215mod tests {
216 use super::*;
217
218 #[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
219 struct Status {
220 kind: String,
221 }
222
223 #[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
224 struct Event {
225 kind: String,
226 label: String,
227 }
228
229 #[test]
230 fn vm_export_includes_schema_and_compatibility() {
231 let vm = VMState {
232 schema_version: default_vm_state_schema_version(),
233 compatibility: CompatibilityMeta::default(),
234 clock: 7,
235 next_coro_id: 3,
236 next_session_id: 2,
237 coroutines: vec![CoroutineState {
238 id: 0,
239 program_id: 1,
240 pc: 4,
241 status: Status {
242 kind: "ready".to_string(),
243 },
244 owned_endpoints: vec![EndpointRef {
245 sid: 1,
246 role: "A".to_string(),
247 }],
248 cost_budget: 100,
249 effect_ctx: Some(Event {
250 kind: "ctx".to_string(),
251 label: "ok".to_string(),
252 }),
253 }],
254 sessions: vec![SessionView {
255 sid: 1,
256 roles: vec!["A".to_string(), "B".to_string()],
257 status: "active".to_string(),
258 epoch: 0,
259 }],
260 obs_trace: vec![TickedObsEvent {
261 tick: 7,
262 event: Event {
263 kind: "sent".to_string(),
264 label: "msg".to_string(),
265 },
266 }],
267 };
268
269 let json = vm_state_to_json(&vm).expect("encode vm state");
270 assert_eq!(json["schema_version"], VM_STATE_SCHEMA_VERSION);
271 assert_eq!(json["compatibility"]["family"], "vm_state");
272 assert_eq!(json["nextCoroId"], 3);
273 assert_eq!(json["nextSessionId"], 2);
274 assert_eq!(json["coroutines"][0]["ownedEndpoints"][0]["sid"], 1);
275 assert_eq!(json["obsTrace"][0]["tick"], 7);
276 }
277
278 #[test]
279 fn vm_export_roundtrip_via_json_decoder() {
280 let vm = VMState {
281 schema_version: default_vm_state_schema_version(),
282 compatibility: CompatibilityMeta::default(),
283 clock: 9,
284 next_coro_id: 5,
285 next_session_id: 4,
286 coroutines: vec![CoroutineState {
287 id: 1,
288 program_id: 2,
289 pc: 3,
290 status: Status {
291 kind: "blocked".to_string(),
292 },
293 owned_endpoints: vec![EndpointRef {
294 sid: 1,
295 role: "A".to_string(),
296 }],
297 cost_budget: 44,
298 effect_ctx: None::<Event>,
299 }],
300 sessions: vec![SessionView {
301 sid: 1,
302 roles: vec!["A".to_string(), "B".to_string()],
303 status: "active".to_string(),
304 epoch: 2,
305 }],
306 obs_trace: vec![TickedObsEvent {
307 tick: 0,
308 event: Event {
309 kind: "sent".to_string(),
310 label: "msg".to_string(),
311 },
312 }],
313 };
314
315 let encoded = vm_state_to_json(&vm).expect("encode vm state");
316 let decoded: VMState<Status, Event> = vm_state_from_json(encoded).expect("decode vm state");
317 assert_eq!(decoded.schema_version, VM_STATE_SCHEMA_VERSION);
318 assert_eq!(decoded.next_coro_id, 5);
319 assert_eq!(decoded.coroutines[0].program_id, 2);
320 assert_eq!(decoded.obs_trace[0].tick, 0);
321 }
322
323 #[test]
324 fn vm_export_legacy_aliases_decode() {
325 let legacy = json!({
326 "schema_version": "vm_state.v0",
327 "clock": 1,
328 "next_coro_id": 7,
329 "next_session_id": 3,
330 "coroutines": [{
331 "id": 0,
332 "program_id": 4,
333 "pc": 0,
334 "status": {"kind": "ready"},
335 "owned_endpoints": [{"sid": 1, "role": "A"}],
336 "cost_budget": 99,
337 "effect_ctx": null
338 }],
339 "sessions": [{
340 "sid": 1,
341 "roles": ["A", "B"],
342 "status": "active",
343 "epoch": 0
344 }],
345 "obs_trace": [{
346 "tick": 1,
347 "event": {"kind": "sent", "label": "msg"}
348 }]
349 });
350
351 let decoded: VMState<Status, Event> =
352 vm_state_from_json(legacy).expect("decode legacy vm state");
353 assert_eq!(decoded.schema_version, "vm_state.v0");
354 assert_eq!(decoded.next_coro_id, 7);
355 assert_eq!(decoded.coroutines[0].program_id, 4);
356 assert_eq!(decoded.obs_trace.len(), 1);
357 }
358
359 #[test]
360 fn vm_export_serialization_failure_is_not_silenced() {
361 struct FailingSerialize;
362 impl Serialize for FailingSerialize {
363 fn serialize<S>(&self, _serializer: S) -> Result<S::Ok, S::Error>
364 where
365 S: serde::Serializer,
366 {
367 Err(serde::ser::Error::custom("intentional failure"))
368 }
369 }
370
371 let err = status_to_json(&FailingSerialize).expect_err("must surface serialization error");
372 assert!(err.to_string().contains("intentional failure"));
373 }
374}