Skip to main content

rivetkit_inspector_protocol/
versioned.rs

1use anyhow::{Result, bail};
2use serde_bare::Uint;
3use vbare::OwnedVersionedData;
4
5use crate::generated::{v1, v2, v3, v4};
6
7const WORKFLOW_HISTORY_DROPPED_ERROR: &str = "inspector.workflow_history_dropped";
8const QUEUE_DROPPED_ERROR: &str = "inspector.queue_dropped";
9const TRACE_DROPPED_ERROR: &str = "inspector.trace_dropped";
10const DATABASE_DROPPED_ERROR: &str = "inspector.database_dropped";
11
12pub enum ToServer {
13	V1(v1::ToServer),
14	V2(v2::ToServer),
15	V3(v3::ToServer),
16	V4(v4::ToServer),
17}
18
19impl OwnedVersionedData for ToServer {
20	type Latest = v4::ToServer;
21
22	fn wrap_latest(latest: Self::Latest) -> Self {
23		Self::V4(latest)
24	}
25
26	fn unwrap_latest(self) -> Result<Self::Latest> {
27		match self {
28			Self::V4(data) => Ok(data),
29			_ => bail!("version not latest"),
30		}
31	}
32
33	fn deserialize_version(payload: &[u8], version: u16) -> Result<Self> {
34		match version {
35			1 => Ok(Self::V1(serde_bare::from_slice(payload)?)),
36			2 => Ok(Self::V2(serde_bare::from_slice(payload)?)),
37			3 => Ok(Self::V3(serde_bare::from_slice(payload)?)),
38			4 => Ok(Self::V4(serde_bare::from_slice(payload)?)),
39			_ => bail!("invalid inspector protocol version for ToServer: {version}"),
40		}
41	}
42
43	fn serialize_version(self, version: u16) -> Result<Vec<u8>> {
44		match (self, version) {
45			(Self::V1(data), 1) => serde_bare::to_vec(&data).map_err(Into::into),
46			(Self::V2(data), 2) => serde_bare::to_vec(&data).map_err(Into::into),
47			(Self::V3(data), 3) => serde_bare::to_vec(&data).map_err(Into::into),
48			(Self::V4(data), 4) => serde_bare::to_vec(&data).map_err(Into::into),
49			(_, version) => bail!("unexpected inspector protocol version for ToServer: {version}"),
50		}
51	}
52
53	fn deserialize_converters() -> Vec<impl Fn(Self) -> Result<Self>> {
54		vec![Self::v1_to_v2, Self::v2_to_v3, Self::v3_to_v4]
55	}
56
57	fn serialize_converters() -> Vec<impl Fn(Self) -> Result<Self>> {
58		vec![Self::v4_to_v3, Self::v3_to_v2, Self::v2_to_v1]
59	}
60}
61
62impl ToServer {
63	fn v1_to_v2(self) -> Result<Self> {
64		let Self::V1(data) = self else {
65			bail!("expected inspector protocol v1 ToServer")
66		};
67
68		let body = match data.body {
69			v1::ToServerBody::PatchStateRequest(req) => {
70				v2::ToServerBody::PatchStateRequest(req.into())
71			}
72			v1::ToServerBody::StateRequest(req) => v2::ToServerBody::StateRequest(req.into()),
73			v1::ToServerBody::ConnectionsRequest(req) => {
74				v2::ToServerBody::ConnectionsRequest(req.into())
75			}
76			v1::ToServerBody::ActionRequest(req) => v2::ToServerBody::ActionRequest(req.into()),
77			v1::ToServerBody::RpcsListRequest(req) => v2::ToServerBody::RpcsListRequest(req.into()),
78			v1::ToServerBody::EventsRequest(_) | v1::ToServerBody::ClearEventsRequest(_) => {
79				bail!("cannot convert inspector v1 events requests to v2")
80			}
81		};
82
83		Ok(Self::V2(v2::ToServer { body }))
84	}
85
86	fn v2_to_v3(self) -> Result<Self> {
87		let Self::V2(data) = self else {
88			bail!("expected inspector protocol v2 ToServer")
89		};
90		Ok(Self::V3(data.into()))
91	}
92
93	fn v3_to_v4(self) -> Result<Self> {
94		let Self::V3(data) = self else {
95			bail!("expected inspector protocol v3 ToServer")
96		};
97
98		let body = match data.body {
99			v3::ToServerBody::PatchStateRequest(req) => {
100				v4::ToServerBody::PatchStateRequest(req.into())
101			}
102			v3::ToServerBody::StateRequest(req) => v4::ToServerBody::StateRequest(req.into()),
103			v3::ToServerBody::ConnectionsRequest(req) => {
104				v4::ToServerBody::ConnectionsRequest(req.into())
105			}
106			v3::ToServerBody::ActionRequest(req) => v4::ToServerBody::ActionRequest(req.into()),
107			v3::ToServerBody::RpcsListRequest(req) => v4::ToServerBody::RpcsListRequest(req.into()),
108			v3::ToServerBody::TraceQueryRequest(req) => {
109				v4::ToServerBody::TraceQueryRequest(req.into())
110			}
111			v3::ToServerBody::QueueRequest(req) => v4::ToServerBody::QueueRequest(req.into()),
112			v3::ToServerBody::WorkflowHistoryRequest(req) => {
113				v4::ToServerBody::WorkflowHistoryRequest(req.into())
114			}
115			v3::ToServerBody::DatabaseSchemaRequest(req) => {
116				v4::ToServerBody::DatabaseSchemaRequest(req.into())
117			}
118			v3::ToServerBody::DatabaseTableRowsRequest(req) => {
119				v4::ToServerBody::DatabaseTableRowsRequest(req.into())
120			}
121		};
122
123		Ok(Self::V4(v4::ToServer { body }))
124	}
125
126	fn v4_to_v3(self) -> Result<Self> {
127		let Self::V4(data) = self else {
128			bail!("expected inspector protocol v4 ToServer")
129		};
130
131		let body = match data.body {
132			v4::ToServerBody::PatchStateRequest(req) => {
133				v3::ToServerBody::PatchStateRequest(req.into())
134			}
135			v4::ToServerBody::StateRequest(req) => v3::ToServerBody::StateRequest(req.into()),
136			v4::ToServerBody::ConnectionsRequest(req) => {
137				v3::ToServerBody::ConnectionsRequest(req.into())
138			}
139			v4::ToServerBody::ActionRequest(req) => v3::ToServerBody::ActionRequest(req.into()),
140			v4::ToServerBody::RpcsListRequest(req) => v3::ToServerBody::RpcsListRequest(req.into()),
141			v4::ToServerBody::TraceQueryRequest(req) => {
142				v3::ToServerBody::TraceQueryRequest(req.into())
143			}
144			v4::ToServerBody::QueueRequest(req) => v3::ToServerBody::QueueRequest(req.into()),
145			v4::ToServerBody::WorkflowHistoryRequest(req) => {
146				v3::ToServerBody::WorkflowHistoryRequest(req.into())
147			}
148			v4::ToServerBody::WorkflowReplayRequest(_) => {
149				bail!("cannot convert inspector v4 workflow replay requests to v3")
150			}
151			v4::ToServerBody::DatabaseSchemaRequest(req) => {
152				v3::ToServerBody::DatabaseSchemaRequest(req.into())
153			}
154			v4::ToServerBody::DatabaseTableRowsRequest(req) => {
155				v3::ToServerBody::DatabaseTableRowsRequest(req.into())
156			}
157		};
158
159		Ok(Self::V3(v3::ToServer { body }))
160	}
161
162	fn v3_to_v2(self) -> Result<Self> {
163		let Self::V3(data) = self else {
164			bail!("expected inspector protocol v3 ToServer")
165		};
166
167		let body = match data.body {
168			v3::ToServerBody::PatchStateRequest(req) => {
169				v2::ToServerBody::PatchStateRequest(req.into())
170			}
171			v3::ToServerBody::StateRequest(req) => v2::ToServerBody::StateRequest(req.into()),
172			v3::ToServerBody::ConnectionsRequest(req) => {
173				v2::ToServerBody::ConnectionsRequest(req.into())
174			}
175			v3::ToServerBody::ActionRequest(req) => v2::ToServerBody::ActionRequest(req.into()),
176			v3::ToServerBody::RpcsListRequest(req) => v2::ToServerBody::RpcsListRequest(req.into()),
177			v3::ToServerBody::TraceQueryRequest(req) => {
178				v2::ToServerBody::TraceQueryRequest(req.into())
179			}
180			v3::ToServerBody::QueueRequest(req) => v2::ToServerBody::QueueRequest(req.into()),
181			v3::ToServerBody::WorkflowHistoryRequest(req) => {
182				v2::ToServerBody::WorkflowHistoryRequest(req.into())
183			}
184			v3::ToServerBody::DatabaseSchemaRequest(_)
185			| v3::ToServerBody::DatabaseTableRowsRequest(_) => {
186				bail!("cannot convert inspector v3 database requests to v2")
187			}
188		};
189
190		Ok(Self::V2(v2::ToServer { body }))
191	}
192
193	fn v2_to_v1(self) -> Result<Self> {
194		let Self::V2(data) = self else {
195			bail!("expected inspector protocol v2 ToServer")
196		};
197
198		let body = match data.body {
199			v2::ToServerBody::PatchStateRequest(req) => {
200				v1::ToServerBody::PatchStateRequest(req.into())
201			}
202			v2::ToServerBody::StateRequest(req) => v1::ToServerBody::StateRequest(req.into()),
203			v2::ToServerBody::ConnectionsRequest(req) => {
204				v1::ToServerBody::ConnectionsRequest(req.into())
205			}
206			v2::ToServerBody::ActionRequest(req) => v1::ToServerBody::ActionRequest(req.into()),
207			v2::ToServerBody::RpcsListRequest(req) => v1::ToServerBody::RpcsListRequest(req.into()),
208			v2::ToServerBody::TraceQueryRequest(_)
209			| v2::ToServerBody::QueueRequest(_)
210			| v2::ToServerBody::WorkflowHistoryRequest(_) => {
211				bail!("cannot convert inspector v2 queue/trace/workflow requests to v1")
212			}
213		};
214
215		Ok(Self::V1(v1::ToServer { body }))
216	}
217}
218
219pub enum ToClient {
220	V1(v1::ToClient),
221	V2(v2::ToClient),
222	V3(v3::ToClient),
223	V4(v4::ToClient),
224}
225
226impl OwnedVersionedData for ToClient {
227	type Latest = v4::ToClient;
228
229	fn wrap_latest(latest: Self::Latest) -> Self {
230		Self::V4(latest)
231	}
232
233	fn unwrap_latest(self) -> Result<Self::Latest> {
234		match self {
235			Self::V4(data) => Ok(data),
236			_ => bail!("version not latest"),
237		}
238	}
239
240	fn deserialize_version(payload: &[u8], version: u16) -> Result<Self> {
241		match version {
242			1 => Ok(Self::V1(serde_bare::from_slice(payload)?)),
243			2 => Ok(Self::V2(serde_bare::from_slice(payload)?)),
244			3 => Ok(Self::V3(serde_bare::from_slice(payload)?)),
245			4 => Ok(Self::V4(serde_bare::from_slice(payload)?)),
246			_ => bail!("invalid inspector protocol version for ToClient: {version}"),
247		}
248	}
249
250	fn serialize_version(self, version: u16) -> Result<Vec<u8>> {
251		match (self, version) {
252			(Self::V1(data), 1) => serde_bare::to_vec(&data).map_err(Into::into),
253			(Self::V2(data), 2) => serde_bare::to_vec(&data).map_err(Into::into),
254			(Self::V3(data), 3) => serde_bare::to_vec(&data).map_err(Into::into),
255			(Self::V4(data), 4) => serde_bare::to_vec(&data).map_err(Into::into),
256			(_, version) => bail!("unexpected inspector protocol version for ToClient: {version}"),
257		}
258	}
259
260	fn deserialize_converters() -> Vec<impl Fn(Self) -> Result<Self>> {
261		vec![Self::v1_to_v2, Self::v2_to_v3, Self::v3_to_v4]
262	}
263
264	fn serialize_converters() -> Vec<impl Fn(Self) -> Result<Self>> {
265		vec![Self::v4_to_v3, Self::v3_to_v2, Self::v2_to_v1]
266	}
267}
268
269impl ToClient {
270	fn v1_to_v2(self) -> Result<Self> {
271		let Self::V1(data) = self else {
272			bail!("expected inspector protocol v1 ToClient")
273		};
274
275		let body = match data.body {
276			v1::ToClientBody::StateResponse(resp) => v2::ToClientBody::StateResponse(resp.into()),
277			v1::ToClientBody::ConnectionsResponse(resp) => {
278				v2::ToClientBody::ConnectionsResponse(resp.into())
279			}
280			v1::ToClientBody::ActionResponse(resp) => v2::ToClientBody::ActionResponse(resp.into()),
281			v1::ToClientBody::RpcsListResponse(resp) => {
282				v2::ToClientBody::RpcsListResponse(resp.into())
283			}
284			v1::ToClientBody::ConnectionsUpdated(update) => {
285				v2::ToClientBody::ConnectionsUpdated(update.into())
286			}
287			v1::ToClientBody::StateUpdated(update) => v2::ToClientBody::StateUpdated(update.into()),
288			v1::ToClientBody::Error(error) => v2::ToClientBody::Error(error.into()),
289			v1::ToClientBody::Init(init) => v2::ToClientBody::Init(v2::Init {
290				connections: convert_vec(init.connections),
291				state: init.state,
292				is_state_enabled: init.is_state_enabled,
293				rpcs: init.rpcs,
294				is_database_enabled: init.is_database_enabled,
295				queue_size: Uint(0),
296				workflow_history: None,
297				is_workflow_enabled: false,
298			}),
299			v1::ToClientBody::EventsResponse(_) | v1::ToClientBody::EventsUpdated(_) => {
300				bail!("cannot convert inspector v1 events responses to v2")
301			}
302		};
303
304		Ok(Self::V2(v2::ToClient { body }))
305	}
306
307	fn v2_to_v3(self) -> Result<Self> {
308		let Self::V2(data) = self else {
309			bail!("expected inspector protocol v2 ToClient")
310		};
311		Ok(Self::V3(data.into()))
312	}
313
314	fn v3_to_v4(self) -> Result<Self> {
315		let Self::V3(data) = self else {
316			bail!("expected inspector protocol v3 ToClient")
317		};
318
319		let body = match data.body {
320			v3::ToClientBody::StateResponse(resp) => v4::ToClientBody::StateResponse(resp.into()),
321			v3::ToClientBody::ConnectionsResponse(resp) => {
322				v4::ToClientBody::ConnectionsResponse(resp.into())
323			}
324			v3::ToClientBody::ActionResponse(resp) => v4::ToClientBody::ActionResponse(resp.into()),
325			v3::ToClientBody::ConnectionsUpdated(update) => {
326				v4::ToClientBody::ConnectionsUpdated(update.into())
327			}
328			v3::ToClientBody::QueueUpdated(update) => v4::ToClientBody::QueueUpdated(update.into()),
329			v3::ToClientBody::StateUpdated(update) => v4::ToClientBody::StateUpdated(update.into()),
330			v3::ToClientBody::WorkflowHistoryUpdated(update) => {
331				v4::ToClientBody::WorkflowHistoryUpdated(update.into())
332			}
333			v3::ToClientBody::RpcsListResponse(resp) => {
334				v4::ToClientBody::RpcsListResponse(resp.into())
335			}
336			v3::ToClientBody::TraceQueryResponse(resp) => {
337				v4::ToClientBody::TraceQueryResponse(resp.into())
338			}
339			v3::ToClientBody::QueueResponse(resp) => v4::ToClientBody::QueueResponse(resp.into()),
340			v3::ToClientBody::WorkflowHistoryResponse(resp) => {
341				v4::ToClientBody::WorkflowHistoryResponse(resp.into())
342			}
343			v3::ToClientBody::Error(error) => v4::ToClientBody::Error(error.into()),
344			v3::ToClientBody::Init(init) => v4::ToClientBody::Init(init.into()),
345			v3::ToClientBody::DatabaseSchemaResponse(resp) => {
346				v4::ToClientBody::DatabaseSchemaResponse(resp.into())
347			}
348			v3::ToClientBody::DatabaseTableRowsResponse(resp) => {
349				v4::ToClientBody::DatabaseTableRowsResponse(resp.into())
350			}
351		};
352
353		Ok(Self::V4(v4::ToClient { body }))
354	}
355
356	fn v4_to_v3(self) -> Result<Self> {
357		let Self::V4(data) = self else {
358			bail!("expected inspector protocol v4 ToClient")
359		};
360
361		let body = match data.body {
362			v4::ToClientBody::StateResponse(resp) => v3::ToClientBody::StateResponse(resp.into()),
363			v4::ToClientBody::ConnectionsResponse(resp) => {
364				v3::ToClientBody::ConnectionsResponse(resp.into())
365			}
366			v4::ToClientBody::ActionResponse(resp) => v3::ToClientBody::ActionResponse(resp.into()),
367			v4::ToClientBody::ConnectionsUpdated(update) => {
368				v3::ToClientBody::ConnectionsUpdated(update.into())
369			}
370			v4::ToClientBody::QueueUpdated(update) => v3::ToClientBody::QueueUpdated(update.into()),
371			v4::ToClientBody::StateUpdated(update) => v3::ToClientBody::StateUpdated(update.into()),
372			v4::ToClientBody::WorkflowHistoryUpdated(update) => {
373				v3::ToClientBody::WorkflowHistoryUpdated(update.into())
374			}
375			v4::ToClientBody::RpcsListResponse(resp) => {
376				v3::ToClientBody::RpcsListResponse(resp.into())
377			}
378			v4::ToClientBody::TraceQueryResponse(resp) => {
379				v3::ToClientBody::TraceQueryResponse(resp.into())
380			}
381			v4::ToClientBody::QueueResponse(resp) => v3::ToClientBody::QueueResponse(resp.into()),
382			v4::ToClientBody::WorkflowHistoryResponse(resp) => {
383				v3::ToClientBody::WorkflowHistoryResponse(resp.into())
384			}
385			v4::ToClientBody::WorkflowReplayResponse(_) => {
386				v3::ToClientBody::Error(dropped_error(WORKFLOW_HISTORY_DROPPED_ERROR).into())
387			}
388			v4::ToClientBody::Error(error) => v3::ToClientBody::Error(error.into()),
389			v4::ToClientBody::Init(init) => v3::ToClientBody::Init(init.into()),
390			v4::ToClientBody::DatabaseSchemaResponse(resp) => {
391				v3::ToClientBody::DatabaseSchemaResponse(resp.into())
392			}
393			v4::ToClientBody::DatabaseTableRowsResponse(resp) => {
394				v3::ToClientBody::DatabaseTableRowsResponse(resp.into())
395			}
396		};
397
398		Ok(Self::V3(v3::ToClient { body }))
399	}
400
401	fn v3_to_v2(self) -> Result<Self> {
402		let Self::V3(data) = self else {
403			bail!("expected inspector protocol v3 ToClient")
404		};
405
406		let body = match data.body {
407			v3::ToClientBody::StateResponse(resp) => v2::ToClientBody::StateResponse(resp.into()),
408			v3::ToClientBody::ConnectionsResponse(resp) => {
409				v2::ToClientBody::ConnectionsResponse(resp.into())
410			}
411			v3::ToClientBody::ActionResponse(resp) => v2::ToClientBody::ActionResponse(resp.into()),
412			v3::ToClientBody::ConnectionsUpdated(update) => {
413				v2::ToClientBody::ConnectionsUpdated(update.into())
414			}
415			v3::ToClientBody::QueueUpdated(update) => v2::ToClientBody::QueueUpdated(update.into()),
416			v3::ToClientBody::StateUpdated(update) => v2::ToClientBody::StateUpdated(update.into()),
417			v3::ToClientBody::WorkflowHistoryUpdated(update) => {
418				v2::ToClientBody::WorkflowHistoryUpdated(update.into())
419			}
420			v3::ToClientBody::RpcsListResponse(resp) => {
421				v2::ToClientBody::RpcsListResponse(resp.into())
422			}
423			v3::ToClientBody::TraceQueryResponse(resp) => {
424				v2::ToClientBody::TraceQueryResponse(resp.into())
425			}
426			v3::ToClientBody::QueueResponse(resp) => v2::ToClientBody::QueueResponse(resp.into()),
427			v3::ToClientBody::WorkflowHistoryResponse(resp) => {
428				v2::ToClientBody::WorkflowHistoryResponse(resp.into())
429			}
430			v3::ToClientBody::Error(error) => v2::ToClientBody::Error(error.into()),
431			v3::ToClientBody::Init(init) => v2::ToClientBody::Init(init.into()),
432			v3::ToClientBody::DatabaseSchemaResponse(_)
433			| v3::ToClientBody::DatabaseTableRowsResponse(_) => {
434				v2::ToClientBody::Error(dropped_error(DATABASE_DROPPED_ERROR))
435			}
436		};
437
438		Ok(Self::V2(v2::ToClient { body }))
439	}
440
441	fn v2_to_v1(self) -> Result<Self> {
442		let Self::V2(data) = self else {
443			bail!("expected inspector protocol v2 ToClient")
444		};
445
446		let body = match data.body {
447			v2::ToClientBody::StateResponse(resp) => v1::ToClientBody::StateResponse(resp.into()),
448			v2::ToClientBody::ConnectionsResponse(resp) => {
449				v1::ToClientBody::ConnectionsResponse(resp.into())
450			}
451			v2::ToClientBody::ActionResponse(resp) => v1::ToClientBody::ActionResponse(resp.into()),
452			v2::ToClientBody::ConnectionsUpdated(update) => {
453				v1::ToClientBody::ConnectionsUpdated(update.into())
454			}
455			v2::ToClientBody::StateUpdated(update) => v1::ToClientBody::StateUpdated(update.into()),
456			v2::ToClientBody::RpcsListResponse(resp) => {
457				v1::ToClientBody::RpcsListResponse(resp.into())
458			}
459			v2::ToClientBody::Error(error) => v1::ToClientBody::Error(error.into()),
460			v2::ToClientBody::Init(init) => v1::ToClientBody::Init(v1::Init {
461				connections: init.connections.into_iter().map(Into::into).collect(),
462				events: Vec::new(),
463				state: init.state,
464				is_state_enabled: init.is_state_enabled,
465				rpcs: init.rpcs,
466				is_database_enabled: init.is_database_enabled,
467			}),
468			v2::ToClientBody::QueueUpdated(_) | v2::ToClientBody::QueueResponse(_) => {
469				v1::ToClientBody::Error(dropped_error(QUEUE_DROPPED_ERROR).into())
470			}
471			v2::ToClientBody::WorkflowHistoryUpdated(_)
472			| v2::ToClientBody::WorkflowHistoryResponse(_) => {
473				v1::ToClientBody::Error(dropped_error(WORKFLOW_HISTORY_DROPPED_ERROR).into())
474			}
475			v2::ToClientBody::TraceQueryResponse(_) => {
476				v1::ToClientBody::Error(dropped_error(TRACE_DROPPED_ERROR).into())
477			}
478		};
479
480		Ok(Self::V1(v1::ToClient { body }))
481	}
482}
483
484fn convert_vec<From, To>(values: Vec<From>) -> Vec<To>
485where
486	From: Into<To>,
487{
488	values.into_iter().map(Into::into).collect()
489}
490
491macro_rules! impl_same_fields_pair {
492	($left:ident, $right:ident, $ty:ident { $($field:ident),+ $(,)? }) => {
493		impl From<$left::$ty> for $right::$ty {
494			fn from(value: $left::$ty) -> Self {
495				Self {
496					$($field: value.$field),+
497				}
498			}
499		}
500
501		impl From<$right::$ty> for $left::$ty {
502			fn from(value: $right::$ty) -> Self {
503				Self {
504					$($field: value.$field),+
505				}
506			}
507		}
508	};
509}
510
511macro_rules! impl_connection_list_pair {
512	($left:ident, $right:ident, $ty:ident) => {
513		impl From<$left::$ty> for $right::$ty {
514			fn from(value: $left::$ty) -> Self {
515				Self {
516					connections: convert_vec(value.connections),
517				}
518			}
519		}
520
521		impl From<$right::$ty> for $left::$ty {
522			fn from(value: $right::$ty) -> Self {
523				Self {
524					connections: convert_vec(value.connections),
525				}
526			}
527		}
528	};
529}
530
531macro_rules! impl_connections_response_pair {
532	($left:ident, $right:ident) => {
533		impl From<$left::ConnectionsResponse> for $right::ConnectionsResponse {
534			fn from(value: $left::ConnectionsResponse) -> Self {
535				Self {
536					rid: value.rid,
537					connections: convert_vec(value.connections),
538				}
539			}
540		}
541
542		impl From<$right::ConnectionsResponse> for $left::ConnectionsResponse {
543			fn from(value: $right::ConnectionsResponse) -> Self {
544				Self {
545					rid: value.rid,
546					connections: convert_vec(value.connections),
547				}
548			}
549		}
550	};
551}
552
553macro_rules! impl_queue_status_pair {
554	($left:ident, $right:ident) => {
555		impl From<$left::QueueStatus> for $right::QueueStatus {
556			fn from(value: $left::QueueStatus) -> Self {
557				Self {
558					size: value.size,
559					max_size: value.max_size,
560					messages: convert_vec(value.messages),
561					truncated: value.truncated,
562				}
563			}
564		}
565
566		impl From<$right::QueueStatus> for $left::QueueStatus {
567			fn from(value: $right::QueueStatus) -> Self {
568				Self {
569					size: value.size,
570					max_size: value.max_size,
571					messages: convert_vec(value.messages),
572					truncated: value.truncated,
573				}
574			}
575		}
576	};
577}
578
579macro_rules! impl_queue_response_pair {
580	($left:ident, $right:ident) => {
581		impl From<$left::QueueResponse> for $right::QueueResponse {
582			fn from(value: $left::QueueResponse) -> Self {
583				Self {
584					rid: value.rid,
585					status: value.status.into(),
586				}
587			}
588		}
589
590		impl From<$right::QueueResponse> for $left::QueueResponse {
591			fn from(value: $right::QueueResponse) -> Self {
592				Self {
593					rid: value.rid,
594					status: value.status.into(),
595				}
596			}
597		}
598	};
599}
600
601macro_rules! impl_init_pair {
602	($left:ident, $right:ident) => {
603		impl From<$left::Init> for $right::Init {
604			fn from(value: $left::Init) -> Self {
605				Self {
606					connections: convert_vec(value.connections),
607					state: value.state,
608					is_state_enabled: value.is_state_enabled,
609					rpcs: value.rpcs,
610					is_database_enabled: value.is_database_enabled,
611					queue_size: value.queue_size,
612					workflow_history: value.workflow_history,
613					is_workflow_enabled: value.is_workflow_enabled,
614				}
615			}
616		}
617
618		impl From<$right::Init> for $left::Init {
619			fn from(value: $right::Init) -> Self {
620				Self {
621					connections: convert_vec(value.connections),
622					state: value.state,
623					is_state_enabled: value.is_state_enabled,
624					rpcs: value.rpcs,
625					is_database_enabled: value.is_database_enabled,
626					queue_size: value.queue_size,
627					workflow_history: value.workflow_history,
628					is_workflow_enabled: value.is_workflow_enabled,
629				}
630			}
631		}
632	};
633}
634
635macro_rules! impl_common_actor_pair {
636	($left:ident, $right:ident) => {
637		impl_same_fields_pair!($left, $right, PatchStateRequest { state });
638		impl_same_fields_pair!($left, $right, ActionRequest { id, name, args });
639		impl_same_fields_pair!($left, $right, StateRequest { id });
640		impl_same_fields_pair!($left, $right, ConnectionsRequest { id });
641		impl_same_fields_pair!($left, $right, RpcsListRequest { id });
642		impl_same_fields_pair!($left, $right, Connection { id, details });
643		impl_connections_response_pair!($left, $right);
644		impl_connection_list_pair!($left, $right, ConnectionsUpdated);
645		impl_same_fields_pair!(
646			$left,
647			$right,
648			StateResponse {
649				rid,
650				state,
651				is_state_enabled,
652			}
653		);
654		impl_same_fields_pair!($left, $right, ActionResponse { rid, output });
655		impl_same_fields_pair!($left, $right, StateUpdated { state });
656		impl_same_fields_pair!($left, $right, RpcsListResponse { rid, rpcs });
657		impl_same_fields_pair!($left, $right, Error { message });
658	};
659}
660
661macro_rules! impl_queue_workflow_pair {
662	($left:ident, $right:ident) => {
663		impl_same_fields_pair!(
664			$left,
665			$right,
666			TraceQueryRequest {
667				id,
668				start_ms,
669				end_ms,
670				limit,
671			}
672		);
673		impl_same_fields_pair!($left, $right, TraceQueryResponse { rid, payload });
674		impl_same_fields_pair!($left, $right, QueueRequest { id, limit });
675		impl_same_fields_pair!(
676			$left,
677			$right,
678			QueueMessageSummary {
679				id,
680				name,
681				created_at_ms,
682			}
683		);
684		impl_queue_status_pair!($left, $right);
685		impl_queue_response_pair!($left, $right);
686		impl_same_fields_pair!($left, $right, QueueUpdated { queue_size });
687		impl_same_fields_pair!($left, $right, WorkflowHistoryRequest { id });
688		impl_same_fields_pair!(
689			$left,
690			$right,
691			WorkflowHistoryResponse {
692				rid,
693				history,
694				is_workflow_enabled,
695			}
696		);
697		impl_same_fields_pair!($left, $right, WorkflowHistoryUpdated { history });
698		impl_init_pair!($left, $right);
699	};
700}
701
702macro_rules! impl_database_pair {
703	($left:ident, $right:ident) => {
704		impl_same_fields_pair!($left, $right, DatabaseSchemaRequest { id });
705		impl_same_fields_pair!($left, $right, DatabaseSchemaResponse { rid, schema });
706		impl_same_fields_pair!(
707			$left,
708			$right,
709			DatabaseTableRowsRequest {
710				id,
711				table,
712				limit,
713				offset,
714			}
715		);
716		impl_same_fields_pair!($left, $right, DatabaseTableRowsResponse { rid, result });
717	};
718}
719
720impl_common_actor_pair!(v1, v2);
721impl_common_actor_pair!(v2, v3);
722impl_common_actor_pair!(v3, v4);
723impl_queue_workflow_pair!(v2, v3);
724impl_queue_workflow_pair!(v3, v4);
725impl_database_pair!(v3, v4);
726
727impl From<v2::ToServerBody> for v3::ToServerBody {
728	fn from(value: v2::ToServerBody) -> Self {
729		match value {
730			v2::ToServerBody::PatchStateRequest(req) => Self::PatchStateRequest(req.into()),
731			v2::ToServerBody::StateRequest(req) => Self::StateRequest(req.into()),
732			v2::ToServerBody::ConnectionsRequest(req) => Self::ConnectionsRequest(req.into()),
733			v2::ToServerBody::ActionRequest(req) => Self::ActionRequest(req.into()),
734			v2::ToServerBody::RpcsListRequest(req) => Self::RpcsListRequest(req.into()),
735			v2::ToServerBody::TraceQueryRequest(req) => Self::TraceQueryRequest(req.into()),
736			v2::ToServerBody::QueueRequest(req) => Self::QueueRequest(req.into()),
737			v2::ToServerBody::WorkflowHistoryRequest(req) => {
738				Self::WorkflowHistoryRequest(req.into())
739			}
740		}
741	}
742}
743
744impl From<v2::ToServer> for v3::ToServer {
745	fn from(value: v2::ToServer) -> Self {
746		Self {
747			body: value.body.into(),
748		}
749	}
750}
751
752impl From<v2::ToClientBody> for v3::ToClientBody {
753	fn from(value: v2::ToClientBody) -> Self {
754		match value {
755			v2::ToClientBody::StateResponse(resp) => Self::StateResponse(resp.into()),
756			v2::ToClientBody::ConnectionsResponse(resp) => Self::ConnectionsResponse(resp.into()),
757			v2::ToClientBody::ActionResponse(resp) => Self::ActionResponse(resp.into()),
758			v2::ToClientBody::ConnectionsUpdated(update) => Self::ConnectionsUpdated(update.into()),
759			v2::ToClientBody::QueueUpdated(update) => Self::QueueUpdated(update.into()),
760			v2::ToClientBody::StateUpdated(update) => Self::StateUpdated(update.into()),
761			v2::ToClientBody::WorkflowHistoryUpdated(update) => {
762				Self::WorkflowHistoryUpdated(update.into())
763			}
764			v2::ToClientBody::RpcsListResponse(resp) => Self::RpcsListResponse(resp.into()),
765			v2::ToClientBody::TraceQueryResponse(resp) => Self::TraceQueryResponse(resp.into()),
766			v2::ToClientBody::QueueResponse(resp) => Self::QueueResponse(resp.into()),
767			v2::ToClientBody::WorkflowHistoryResponse(resp) => {
768				Self::WorkflowHistoryResponse(resp.into())
769			}
770			v2::ToClientBody::Error(error) => Self::Error(error.into()),
771			v2::ToClientBody::Init(init) => Self::Init(init.into()),
772		}
773	}
774}
775
776impl From<v2::ToClient> for v3::ToClient {
777	fn from(value: v2::ToClient) -> Self {
778		Self {
779			body: value.body.into(),
780		}
781	}
782}
783
784fn dropped_error(message: &str) -> v2::Error {
785	v2::Error {
786		message: message.to_owned(),
787	}
788}
789
790#[cfg(test)]
791mod tests {
792	use super::*;
793
794	#[test]
795	fn v3_database_schema_request_keeps_meaning_when_upgrading_to_v4() {
796		let request = ToServer::V3(v3::ToServer {
797			body: v3::ToServerBody::DatabaseSchemaRequest(v3::DatabaseSchemaRequest {
798				id: Uint(7),
799			}),
800		});
801
802		let ToServer::V4(upgraded) = ToServer::v3_to_v4(request).unwrap() else {
803			panic!("expected v4 request")
804		};
805
806		assert!(matches!(
807			upgraded.body,
808			v4::ToServerBody::DatabaseSchemaRequest(v4::DatabaseSchemaRequest { id }) if id == Uint(7)
809		));
810	}
811
812	#[test]
813	fn v4_workflow_replay_response_downgrades_to_v3_error() {
814		let response = ToClient::V4(v4::ToClient {
815			body: v4::ToClientBody::WorkflowReplayResponse(v4::WorkflowReplayResponse {
816				rid: Uint(11),
817				history: Some(b"workflow".to_vec()),
818				is_workflow_enabled: true,
819			}),
820		});
821
822		let ToClient::V3(downgraded) = ToClient::v4_to_v3(response).unwrap() else {
823			panic!("expected v3 response")
824		};
825
826		assert_eq!(
827			downgraded.body,
828			v3::ToClientBody::Error(v3::Error {
829				message: WORKFLOW_HISTORY_DROPPED_ERROR.to_owned(),
830			})
831		);
832	}
833}