1use anyhow::{Result, bail};
2use serde_bare::Uint;
3use vbare::OwnedVersionedData;
4
5use crate::generated::{v1, v2, v3, v4};
6
7const WORKFLOW_HISTORY_DROPPED_ERROR: &str = "inspector.workflow_history_dropped";
8const QUEUE_DROPPED_ERROR: &str = "inspector.queue_dropped";
9const TRACE_DROPPED_ERROR: &str = "inspector.trace_dropped";
10const DATABASE_DROPPED_ERROR: &str = "inspector.database_dropped";
11
12pub enum ToServer {
13 V1(v1::ToServer),
14 V2(v2::ToServer),
15 V3(v3::ToServer),
16 V4(v4::ToServer),
17}
18
19impl OwnedVersionedData for ToServer {
20 type Latest = v4::ToServer;
21
22 fn wrap_latest(latest: Self::Latest) -> Self {
23 Self::V4(latest)
24 }
25
26 fn unwrap_latest(self) -> Result<Self::Latest> {
27 match self {
28 Self::V4(data) => Ok(data),
29 _ => bail!("version not latest"),
30 }
31 }
32
33 fn deserialize_version(payload: &[u8], version: u16) -> Result<Self> {
34 match version {
35 1 => Ok(Self::V1(serde_bare::from_slice(payload)?)),
36 2 => Ok(Self::V2(serde_bare::from_slice(payload)?)),
37 3 => Ok(Self::V3(serde_bare::from_slice(payload)?)),
38 4 => Ok(Self::V4(serde_bare::from_slice(payload)?)),
39 _ => bail!("invalid inspector protocol version for ToServer: {version}"),
40 }
41 }
42
43 fn serialize_version(self, version: u16) -> Result<Vec<u8>> {
44 match (self, version) {
45 (Self::V1(data), 1) => serde_bare::to_vec(&data).map_err(Into::into),
46 (Self::V2(data), 2) => serde_bare::to_vec(&data).map_err(Into::into),
47 (Self::V3(data), 3) => serde_bare::to_vec(&data).map_err(Into::into),
48 (Self::V4(data), 4) => serde_bare::to_vec(&data).map_err(Into::into),
49 (_, version) => bail!("unexpected inspector protocol version for ToServer: {version}"),
50 }
51 }
52
53 fn deserialize_converters() -> Vec<impl Fn(Self) -> Result<Self>> {
54 vec![Self::v1_to_v2, Self::v2_to_v3, Self::v3_to_v4]
55 }
56
57 fn serialize_converters() -> Vec<impl Fn(Self) -> Result<Self>> {
58 vec![Self::v4_to_v3, Self::v3_to_v2, Self::v2_to_v1]
59 }
60}
61
62impl ToServer {
63 fn v1_to_v2(self) -> Result<Self> {
64 let Self::V1(data) = self else {
65 bail!("expected inspector protocol v1 ToServer")
66 };
67
68 let body = match data.body {
69 v1::ToServerBody::PatchStateRequest(req) => {
70 v2::ToServerBody::PatchStateRequest(req.into())
71 }
72 v1::ToServerBody::StateRequest(req) => v2::ToServerBody::StateRequest(req.into()),
73 v1::ToServerBody::ConnectionsRequest(req) => {
74 v2::ToServerBody::ConnectionsRequest(req.into())
75 }
76 v1::ToServerBody::ActionRequest(req) => v2::ToServerBody::ActionRequest(req.into()),
77 v1::ToServerBody::RpcsListRequest(req) => v2::ToServerBody::RpcsListRequest(req.into()),
78 v1::ToServerBody::EventsRequest(_) | v1::ToServerBody::ClearEventsRequest(_) => {
79 bail!("cannot convert inspector v1 events requests to v2")
80 }
81 };
82
83 Ok(Self::V2(v2::ToServer { body }))
84 }
85
86 fn v2_to_v3(self) -> Result<Self> {
87 let Self::V2(data) = self else {
88 bail!("expected inspector protocol v2 ToServer")
89 };
90 Ok(Self::V3(data.into()))
91 }
92
93 fn v3_to_v4(self) -> Result<Self> {
94 let Self::V3(data) = self else {
95 bail!("expected inspector protocol v3 ToServer")
96 };
97
98 let body = match data.body {
99 v3::ToServerBody::PatchStateRequest(req) => {
100 v4::ToServerBody::PatchStateRequest(req.into())
101 }
102 v3::ToServerBody::StateRequest(req) => v4::ToServerBody::StateRequest(req.into()),
103 v3::ToServerBody::ConnectionsRequest(req) => {
104 v4::ToServerBody::ConnectionsRequest(req.into())
105 }
106 v3::ToServerBody::ActionRequest(req) => v4::ToServerBody::ActionRequest(req.into()),
107 v3::ToServerBody::RpcsListRequest(req) => v4::ToServerBody::RpcsListRequest(req.into()),
108 v3::ToServerBody::TraceQueryRequest(req) => {
109 v4::ToServerBody::TraceQueryRequest(req.into())
110 }
111 v3::ToServerBody::QueueRequest(req) => v4::ToServerBody::QueueRequest(req.into()),
112 v3::ToServerBody::WorkflowHistoryRequest(req) => {
113 v4::ToServerBody::WorkflowHistoryRequest(req.into())
114 }
115 v3::ToServerBody::DatabaseSchemaRequest(req) => {
116 v4::ToServerBody::DatabaseSchemaRequest(req.into())
117 }
118 v3::ToServerBody::DatabaseTableRowsRequest(req) => {
119 v4::ToServerBody::DatabaseTableRowsRequest(req.into())
120 }
121 };
122
123 Ok(Self::V4(v4::ToServer { body }))
124 }
125
126 fn v4_to_v3(self) -> Result<Self> {
127 let Self::V4(data) = self else {
128 bail!("expected inspector protocol v4 ToServer")
129 };
130
131 let body = match data.body {
132 v4::ToServerBody::PatchStateRequest(req) => {
133 v3::ToServerBody::PatchStateRequest(req.into())
134 }
135 v4::ToServerBody::StateRequest(req) => v3::ToServerBody::StateRequest(req.into()),
136 v4::ToServerBody::ConnectionsRequest(req) => {
137 v3::ToServerBody::ConnectionsRequest(req.into())
138 }
139 v4::ToServerBody::ActionRequest(req) => v3::ToServerBody::ActionRequest(req.into()),
140 v4::ToServerBody::RpcsListRequest(req) => v3::ToServerBody::RpcsListRequest(req.into()),
141 v4::ToServerBody::TraceQueryRequest(req) => {
142 v3::ToServerBody::TraceQueryRequest(req.into())
143 }
144 v4::ToServerBody::QueueRequest(req) => v3::ToServerBody::QueueRequest(req.into()),
145 v4::ToServerBody::WorkflowHistoryRequest(req) => {
146 v3::ToServerBody::WorkflowHistoryRequest(req.into())
147 }
148 v4::ToServerBody::WorkflowReplayRequest(_) => {
149 bail!("cannot convert inspector v4 workflow replay requests to v3")
150 }
151 v4::ToServerBody::DatabaseSchemaRequest(req) => {
152 v3::ToServerBody::DatabaseSchemaRequest(req.into())
153 }
154 v4::ToServerBody::DatabaseTableRowsRequest(req) => {
155 v3::ToServerBody::DatabaseTableRowsRequest(req.into())
156 }
157 };
158
159 Ok(Self::V3(v3::ToServer { body }))
160 }
161
162 fn v3_to_v2(self) -> Result<Self> {
163 let Self::V3(data) = self else {
164 bail!("expected inspector protocol v3 ToServer")
165 };
166
167 let body = match data.body {
168 v3::ToServerBody::PatchStateRequest(req) => {
169 v2::ToServerBody::PatchStateRequest(req.into())
170 }
171 v3::ToServerBody::StateRequest(req) => v2::ToServerBody::StateRequest(req.into()),
172 v3::ToServerBody::ConnectionsRequest(req) => {
173 v2::ToServerBody::ConnectionsRequest(req.into())
174 }
175 v3::ToServerBody::ActionRequest(req) => v2::ToServerBody::ActionRequest(req.into()),
176 v3::ToServerBody::RpcsListRequest(req) => v2::ToServerBody::RpcsListRequest(req.into()),
177 v3::ToServerBody::TraceQueryRequest(req) => {
178 v2::ToServerBody::TraceQueryRequest(req.into())
179 }
180 v3::ToServerBody::QueueRequest(req) => v2::ToServerBody::QueueRequest(req.into()),
181 v3::ToServerBody::WorkflowHistoryRequest(req) => {
182 v2::ToServerBody::WorkflowHistoryRequest(req.into())
183 }
184 v3::ToServerBody::DatabaseSchemaRequest(_)
185 | v3::ToServerBody::DatabaseTableRowsRequest(_) => {
186 bail!("cannot convert inspector v3 database requests to v2")
187 }
188 };
189
190 Ok(Self::V2(v2::ToServer { body }))
191 }
192
193 fn v2_to_v1(self) -> Result<Self> {
194 let Self::V2(data) = self else {
195 bail!("expected inspector protocol v2 ToServer")
196 };
197
198 let body = match data.body {
199 v2::ToServerBody::PatchStateRequest(req) => {
200 v1::ToServerBody::PatchStateRequest(req.into())
201 }
202 v2::ToServerBody::StateRequest(req) => v1::ToServerBody::StateRequest(req.into()),
203 v2::ToServerBody::ConnectionsRequest(req) => {
204 v1::ToServerBody::ConnectionsRequest(req.into())
205 }
206 v2::ToServerBody::ActionRequest(req) => v1::ToServerBody::ActionRequest(req.into()),
207 v2::ToServerBody::RpcsListRequest(req) => v1::ToServerBody::RpcsListRequest(req.into()),
208 v2::ToServerBody::TraceQueryRequest(_)
209 | v2::ToServerBody::QueueRequest(_)
210 | v2::ToServerBody::WorkflowHistoryRequest(_) => {
211 bail!("cannot convert inspector v2 queue/trace/workflow requests to v1")
212 }
213 };
214
215 Ok(Self::V1(v1::ToServer { body }))
216 }
217}
218
219pub enum ToClient {
220 V1(v1::ToClient),
221 V2(v2::ToClient),
222 V3(v3::ToClient),
223 V4(v4::ToClient),
224}
225
226impl OwnedVersionedData for ToClient {
227 type Latest = v4::ToClient;
228
229 fn wrap_latest(latest: Self::Latest) -> Self {
230 Self::V4(latest)
231 }
232
233 fn unwrap_latest(self) -> Result<Self::Latest> {
234 match self {
235 Self::V4(data) => Ok(data),
236 _ => bail!("version not latest"),
237 }
238 }
239
240 fn deserialize_version(payload: &[u8], version: u16) -> Result<Self> {
241 match version {
242 1 => Ok(Self::V1(serde_bare::from_slice(payload)?)),
243 2 => Ok(Self::V2(serde_bare::from_slice(payload)?)),
244 3 => Ok(Self::V3(serde_bare::from_slice(payload)?)),
245 4 => Ok(Self::V4(serde_bare::from_slice(payload)?)),
246 _ => bail!("invalid inspector protocol version for ToClient: {version}"),
247 }
248 }
249
250 fn serialize_version(self, version: u16) -> Result<Vec<u8>> {
251 match (self, version) {
252 (Self::V1(data), 1) => serde_bare::to_vec(&data).map_err(Into::into),
253 (Self::V2(data), 2) => serde_bare::to_vec(&data).map_err(Into::into),
254 (Self::V3(data), 3) => serde_bare::to_vec(&data).map_err(Into::into),
255 (Self::V4(data), 4) => serde_bare::to_vec(&data).map_err(Into::into),
256 (_, version) => bail!("unexpected inspector protocol version for ToClient: {version}"),
257 }
258 }
259
260 fn deserialize_converters() -> Vec<impl Fn(Self) -> Result<Self>> {
261 vec![Self::v1_to_v2, Self::v2_to_v3, Self::v3_to_v4]
262 }
263
264 fn serialize_converters() -> Vec<impl Fn(Self) -> Result<Self>> {
265 vec![Self::v4_to_v3, Self::v3_to_v2, Self::v2_to_v1]
266 }
267}
268
269impl ToClient {
270 fn v1_to_v2(self) -> Result<Self> {
271 let Self::V1(data) = self else {
272 bail!("expected inspector protocol v1 ToClient")
273 };
274
275 let body = match data.body {
276 v1::ToClientBody::StateResponse(resp) => v2::ToClientBody::StateResponse(resp.into()),
277 v1::ToClientBody::ConnectionsResponse(resp) => {
278 v2::ToClientBody::ConnectionsResponse(resp.into())
279 }
280 v1::ToClientBody::ActionResponse(resp) => v2::ToClientBody::ActionResponse(resp.into()),
281 v1::ToClientBody::RpcsListResponse(resp) => {
282 v2::ToClientBody::RpcsListResponse(resp.into())
283 }
284 v1::ToClientBody::ConnectionsUpdated(update) => {
285 v2::ToClientBody::ConnectionsUpdated(update.into())
286 }
287 v1::ToClientBody::StateUpdated(update) => v2::ToClientBody::StateUpdated(update.into()),
288 v1::ToClientBody::Error(error) => v2::ToClientBody::Error(error.into()),
289 v1::ToClientBody::Init(init) => v2::ToClientBody::Init(v2::Init {
290 connections: convert_vec(init.connections),
291 state: init.state,
292 is_state_enabled: init.is_state_enabled,
293 rpcs: init.rpcs,
294 is_database_enabled: init.is_database_enabled,
295 queue_size: Uint(0),
296 workflow_history: None,
297 is_workflow_enabled: false,
298 }),
299 v1::ToClientBody::EventsResponse(_) | v1::ToClientBody::EventsUpdated(_) => {
300 bail!("cannot convert inspector v1 events responses to v2")
301 }
302 };
303
304 Ok(Self::V2(v2::ToClient { body }))
305 }
306
307 fn v2_to_v3(self) -> Result<Self> {
308 let Self::V2(data) = self else {
309 bail!("expected inspector protocol v2 ToClient")
310 };
311 Ok(Self::V3(data.into()))
312 }
313
314 fn v3_to_v4(self) -> Result<Self> {
315 let Self::V3(data) = self else {
316 bail!("expected inspector protocol v3 ToClient")
317 };
318
319 let body = match data.body {
320 v3::ToClientBody::StateResponse(resp) => v4::ToClientBody::StateResponse(resp.into()),
321 v3::ToClientBody::ConnectionsResponse(resp) => {
322 v4::ToClientBody::ConnectionsResponse(resp.into())
323 }
324 v3::ToClientBody::ActionResponse(resp) => v4::ToClientBody::ActionResponse(resp.into()),
325 v3::ToClientBody::ConnectionsUpdated(update) => {
326 v4::ToClientBody::ConnectionsUpdated(update.into())
327 }
328 v3::ToClientBody::QueueUpdated(update) => v4::ToClientBody::QueueUpdated(update.into()),
329 v3::ToClientBody::StateUpdated(update) => v4::ToClientBody::StateUpdated(update.into()),
330 v3::ToClientBody::WorkflowHistoryUpdated(update) => {
331 v4::ToClientBody::WorkflowHistoryUpdated(update.into())
332 }
333 v3::ToClientBody::RpcsListResponse(resp) => {
334 v4::ToClientBody::RpcsListResponse(resp.into())
335 }
336 v3::ToClientBody::TraceQueryResponse(resp) => {
337 v4::ToClientBody::TraceQueryResponse(resp.into())
338 }
339 v3::ToClientBody::QueueResponse(resp) => v4::ToClientBody::QueueResponse(resp.into()),
340 v3::ToClientBody::WorkflowHistoryResponse(resp) => {
341 v4::ToClientBody::WorkflowHistoryResponse(resp.into())
342 }
343 v3::ToClientBody::Error(error) => v4::ToClientBody::Error(error.into()),
344 v3::ToClientBody::Init(init) => v4::ToClientBody::Init(init.into()),
345 v3::ToClientBody::DatabaseSchemaResponse(resp) => {
346 v4::ToClientBody::DatabaseSchemaResponse(resp.into())
347 }
348 v3::ToClientBody::DatabaseTableRowsResponse(resp) => {
349 v4::ToClientBody::DatabaseTableRowsResponse(resp.into())
350 }
351 };
352
353 Ok(Self::V4(v4::ToClient { body }))
354 }
355
356 fn v4_to_v3(self) -> Result<Self> {
357 let Self::V4(data) = self else {
358 bail!("expected inspector protocol v4 ToClient")
359 };
360
361 let body = match data.body {
362 v4::ToClientBody::StateResponse(resp) => v3::ToClientBody::StateResponse(resp.into()),
363 v4::ToClientBody::ConnectionsResponse(resp) => {
364 v3::ToClientBody::ConnectionsResponse(resp.into())
365 }
366 v4::ToClientBody::ActionResponse(resp) => v3::ToClientBody::ActionResponse(resp.into()),
367 v4::ToClientBody::ConnectionsUpdated(update) => {
368 v3::ToClientBody::ConnectionsUpdated(update.into())
369 }
370 v4::ToClientBody::QueueUpdated(update) => v3::ToClientBody::QueueUpdated(update.into()),
371 v4::ToClientBody::StateUpdated(update) => v3::ToClientBody::StateUpdated(update.into()),
372 v4::ToClientBody::WorkflowHistoryUpdated(update) => {
373 v3::ToClientBody::WorkflowHistoryUpdated(update.into())
374 }
375 v4::ToClientBody::RpcsListResponse(resp) => {
376 v3::ToClientBody::RpcsListResponse(resp.into())
377 }
378 v4::ToClientBody::TraceQueryResponse(resp) => {
379 v3::ToClientBody::TraceQueryResponse(resp.into())
380 }
381 v4::ToClientBody::QueueResponse(resp) => v3::ToClientBody::QueueResponse(resp.into()),
382 v4::ToClientBody::WorkflowHistoryResponse(resp) => {
383 v3::ToClientBody::WorkflowHistoryResponse(resp.into())
384 }
385 v4::ToClientBody::WorkflowReplayResponse(_) => {
386 v3::ToClientBody::Error(dropped_error(WORKFLOW_HISTORY_DROPPED_ERROR).into())
387 }
388 v4::ToClientBody::Error(error) => v3::ToClientBody::Error(error.into()),
389 v4::ToClientBody::Init(init) => v3::ToClientBody::Init(init.into()),
390 v4::ToClientBody::DatabaseSchemaResponse(resp) => {
391 v3::ToClientBody::DatabaseSchemaResponse(resp.into())
392 }
393 v4::ToClientBody::DatabaseTableRowsResponse(resp) => {
394 v3::ToClientBody::DatabaseTableRowsResponse(resp.into())
395 }
396 };
397
398 Ok(Self::V3(v3::ToClient { body }))
399 }
400
401 fn v3_to_v2(self) -> Result<Self> {
402 let Self::V3(data) = self else {
403 bail!("expected inspector protocol v3 ToClient")
404 };
405
406 let body = match data.body {
407 v3::ToClientBody::StateResponse(resp) => v2::ToClientBody::StateResponse(resp.into()),
408 v3::ToClientBody::ConnectionsResponse(resp) => {
409 v2::ToClientBody::ConnectionsResponse(resp.into())
410 }
411 v3::ToClientBody::ActionResponse(resp) => v2::ToClientBody::ActionResponse(resp.into()),
412 v3::ToClientBody::ConnectionsUpdated(update) => {
413 v2::ToClientBody::ConnectionsUpdated(update.into())
414 }
415 v3::ToClientBody::QueueUpdated(update) => v2::ToClientBody::QueueUpdated(update.into()),
416 v3::ToClientBody::StateUpdated(update) => v2::ToClientBody::StateUpdated(update.into()),
417 v3::ToClientBody::WorkflowHistoryUpdated(update) => {
418 v2::ToClientBody::WorkflowHistoryUpdated(update.into())
419 }
420 v3::ToClientBody::RpcsListResponse(resp) => {
421 v2::ToClientBody::RpcsListResponse(resp.into())
422 }
423 v3::ToClientBody::TraceQueryResponse(resp) => {
424 v2::ToClientBody::TraceQueryResponse(resp.into())
425 }
426 v3::ToClientBody::QueueResponse(resp) => v2::ToClientBody::QueueResponse(resp.into()),
427 v3::ToClientBody::WorkflowHistoryResponse(resp) => {
428 v2::ToClientBody::WorkflowHistoryResponse(resp.into())
429 }
430 v3::ToClientBody::Error(error) => v2::ToClientBody::Error(error.into()),
431 v3::ToClientBody::Init(init) => v2::ToClientBody::Init(init.into()),
432 v3::ToClientBody::DatabaseSchemaResponse(_)
433 | v3::ToClientBody::DatabaseTableRowsResponse(_) => {
434 v2::ToClientBody::Error(dropped_error(DATABASE_DROPPED_ERROR))
435 }
436 };
437
438 Ok(Self::V2(v2::ToClient { body }))
439 }
440
441 fn v2_to_v1(self) -> Result<Self> {
442 let Self::V2(data) = self else {
443 bail!("expected inspector protocol v2 ToClient")
444 };
445
446 let body = match data.body {
447 v2::ToClientBody::StateResponse(resp) => v1::ToClientBody::StateResponse(resp.into()),
448 v2::ToClientBody::ConnectionsResponse(resp) => {
449 v1::ToClientBody::ConnectionsResponse(resp.into())
450 }
451 v2::ToClientBody::ActionResponse(resp) => v1::ToClientBody::ActionResponse(resp.into()),
452 v2::ToClientBody::ConnectionsUpdated(update) => {
453 v1::ToClientBody::ConnectionsUpdated(update.into())
454 }
455 v2::ToClientBody::StateUpdated(update) => v1::ToClientBody::StateUpdated(update.into()),
456 v2::ToClientBody::RpcsListResponse(resp) => {
457 v1::ToClientBody::RpcsListResponse(resp.into())
458 }
459 v2::ToClientBody::Error(error) => v1::ToClientBody::Error(error.into()),
460 v2::ToClientBody::Init(init) => v1::ToClientBody::Init(v1::Init {
461 connections: init.connections.into_iter().map(Into::into).collect(),
462 events: Vec::new(),
463 state: init.state,
464 is_state_enabled: init.is_state_enabled,
465 rpcs: init.rpcs,
466 is_database_enabled: init.is_database_enabled,
467 }),
468 v2::ToClientBody::QueueUpdated(_) | v2::ToClientBody::QueueResponse(_) => {
469 v1::ToClientBody::Error(dropped_error(QUEUE_DROPPED_ERROR).into())
470 }
471 v2::ToClientBody::WorkflowHistoryUpdated(_)
472 | v2::ToClientBody::WorkflowHistoryResponse(_) => {
473 v1::ToClientBody::Error(dropped_error(WORKFLOW_HISTORY_DROPPED_ERROR).into())
474 }
475 v2::ToClientBody::TraceQueryResponse(_) => {
476 v1::ToClientBody::Error(dropped_error(TRACE_DROPPED_ERROR).into())
477 }
478 };
479
480 Ok(Self::V1(v1::ToClient { body }))
481 }
482}
483
484fn convert_vec<From, To>(values: Vec<From>) -> Vec<To>
485where
486 From: Into<To>,
487{
488 values.into_iter().map(Into::into).collect()
489}
490
491macro_rules! impl_same_fields_pair {
492 ($left:ident, $right:ident, $ty:ident { $($field:ident),+ $(,)? }) => {
493 impl From<$left::$ty> for $right::$ty {
494 fn from(value: $left::$ty) -> Self {
495 Self {
496 $($field: value.$field),+
497 }
498 }
499 }
500
501 impl From<$right::$ty> for $left::$ty {
502 fn from(value: $right::$ty) -> Self {
503 Self {
504 $($field: value.$field),+
505 }
506 }
507 }
508 };
509}
510
511macro_rules! impl_connection_list_pair {
512 ($left:ident, $right:ident, $ty:ident) => {
513 impl From<$left::$ty> for $right::$ty {
514 fn from(value: $left::$ty) -> Self {
515 Self {
516 connections: convert_vec(value.connections),
517 }
518 }
519 }
520
521 impl From<$right::$ty> for $left::$ty {
522 fn from(value: $right::$ty) -> Self {
523 Self {
524 connections: convert_vec(value.connections),
525 }
526 }
527 }
528 };
529}
530
531macro_rules! impl_connections_response_pair {
532 ($left:ident, $right:ident) => {
533 impl From<$left::ConnectionsResponse> for $right::ConnectionsResponse {
534 fn from(value: $left::ConnectionsResponse) -> Self {
535 Self {
536 rid: value.rid,
537 connections: convert_vec(value.connections),
538 }
539 }
540 }
541
542 impl From<$right::ConnectionsResponse> for $left::ConnectionsResponse {
543 fn from(value: $right::ConnectionsResponse) -> Self {
544 Self {
545 rid: value.rid,
546 connections: convert_vec(value.connections),
547 }
548 }
549 }
550 };
551}
552
553macro_rules! impl_queue_status_pair {
554 ($left:ident, $right:ident) => {
555 impl From<$left::QueueStatus> for $right::QueueStatus {
556 fn from(value: $left::QueueStatus) -> Self {
557 Self {
558 size: value.size,
559 max_size: value.max_size,
560 messages: convert_vec(value.messages),
561 truncated: value.truncated,
562 }
563 }
564 }
565
566 impl From<$right::QueueStatus> for $left::QueueStatus {
567 fn from(value: $right::QueueStatus) -> Self {
568 Self {
569 size: value.size,
570 max_size: value.max_size,
571 messages: convert_vec(value.messages),
572 truncated: value.truncated,
573 }
574 }
575 }
576 };
577}
578
579macro_rules! impl_queue_response_pair {
580 ($left:ident, $right:ident) => {
581 impl From<$left::QueueResponse> for $right::QueueResponse {
582 fn from(value: $left::QueueResponse) -> Self {
583 Self {
584 rid: value.rid,
585 status: value.status.into(),
586 }
587 }
588 }
589
590 impl From<$right::QueueResponse> for $left::QueueResponse {
591 fn from(value: $right::QueueResponse) -> Self {
592 Self {
593 rid: value.rid,
594 status: value.status.into(),
595 }
596 }
597 }
598 };
599}
600
601macro_rules! impl_init_pair {
602 ($left:ident, $right:ident) => {
603 impl From<$left::Init> for $right::Init {
604 fn from(value: $left::Init) -> Self {
605 Self {
606 connections: convert_vec(value.connections),
607 state: value.state,
608 is_state_enabled: value.is_state_enabled,
609 rpcs: value.rpcs,
610 is_database_enabled: value.is_database_enabled,
611 queue_size: value.queue_size,
612 workflow_history: value.workflow_history,
613 is_workflow_enabled: value.is_workflow_enabled,
614 }
615 }
616 }
617
618 impl From<$right::Init> for $left::Init {
619 fn from(value: $right::Init) -> Self {
620 Self {
621 connections: convert_vec(value.connections),
622 state: value.state,
623 is_state_enabled: value.is_state_enabled,
624 rpcs: value.rpcs,
625 is_database_enabled: value.is_database_enabled,
626 queue_size: value.queue_size,
627 workflow_history: value.workflow_history,
628 is_workflow_enabled: value.is_workflow_enabled,
629 }
630 }
631 }
632 };
633}
634
635macro_rules! impl_common_actor_pair {
636 ($left:ident, $right:ident) => {
637 impl_same_fields_pair!($left, $right, PatchStateRequest { state });
638 impl_same_fields_pair!($left, $right, ActionRequest { id, name, args });
639 impl_same_fields_pair!($left, $right, StateRequest { id });
640 impl_same_fields_pair!($left, $right, ConnectionsRequest { id });
641 impl_same_fields_pair!($left, $right, RpcsListRequest { id });
642 impl_same_fields_pair!($left, $right, Connection { id, details });
643 impl_connections_response_pair!($left, $right);
644 impl_connection_list_pair!($left, $right, ConnectionsUpdated);
645 impl_same_fields_pair!(
646 $left,
647 $right,
648 StateResponse {
649 rid,
650 state,
651 is_state_enabled,
652 }
653 );
654 impl_same_fields_pair!($left, $right, ActionResponse { rid, output });
655 impl_same_fields_pair!($left, $right, StateUpdated { state });
656 impl_same_fields_pair!($left, $right, RpcsListResponse { rid, rpcs });
657 impl_same_fields_pair!($left, $right, Error { message });
658 };
659}
660
661macro_rules! impl_queue_workflow_pair {
662 ($left:ident, $right:ident) => {
663 impl_same_fields_pair!(
664 $left,
665 $right,
666 TraceQueryRequest {
667 id,
668 start_ms,
669 end_ms,
670 limit,
671 }
672 );
673 impl_same_fields_pair!($left, $right, TraceQueryResponse { rid, payload });
674 impl_same_fields_pair!($left, $right, QueueRequest { id, limit });
675 impl_same_fields_pair!(
676 $left,
677 $right,
678 QueueMessageSummary {
679 id,
680 name,
681 created_at_ms,
682 }
683 );
684 impl_queue_status_pair!($left, $right);
685 impl_queue_response_pair!($left, $right);
686 impl_same_fields_pair!($left, $right, QueueUpdated { queue_size });
687 impl_same_fields_pair!($left, $right, WorkflowHistoryRequest { id });
688 impl_same_fields_pair!(
689 $left,
690 $right,
691 WorkflowHistoryResponse {
692 rid,
693 history,
694 is_workflow_enabled,
695 }
696 );
697 impl_same_fields_pair!($left, $right, WorkflowHistoryUpdated { history });
698 impl_init_pair!($left, $right);
699 };
700}
701
702macro_rules! impl_database_pair {
703 ($left:ident, $right:ident) => {
704 impl_same_fields_pair!($left, $right, DatabaseSchemaRequest { id });
705 impl_same_fields_pair!($left, $right, DatabaseSchemaResponse { rid, schema });
706 impl_same_fields_pair!(
707 $left,
708 $right,
709 DatabaseTableRowsRequest {
710 id,
711 table,
712 limit,
713 offset,
714 }
715 );
716 impl_same_fields_pair!($left, $right, DatabaseTableRowsResponse { rid, result });
717 };
718}
719
720impl_common_actor_pair!(v1, v2);
721impl_common_actor_pair!(v2, v3);
722impl_common_actor_pair!(v3, v4);
723impl_queue_workflow_pair!(v2, v3);
724impl_queue_workflow_pair!(v3, v4);
725impl_database_pair!(v3, v4);
726
727impl From<v2::ToServerBody> for v3::ToServerBody {
728 fn from(value: v2::ToServerBody) -> Self {
729 match value {
730 v2::ToServerBody::PatchStateRequest(req) => Self::PatchStateRequest(req.into()),
731 v2::ToServerBody::StateRequest(req) => Self::StateRequest(req.into()),
732 v2::ToServerBody::ConnectionsRequest(req) => Self::ConnectionsRequest(req.into()),
733 v2::ToServerBody::ActionRequest(req) => Self::ActionRequest(req.into()),
734 v2::ToServerBody::RpcsListRequest(req) => Self::RpcsListRequest(req.into()),
735 v2::ToServerBody::TraceQueryRequest(req) => Self::TraceQueryRequest(req.into()),
736 v2::ToServerBody::QueueRequest(req) => Self::QueueRequest(req.into()),
737 v2::ToServerBody::WorkflowHistoryRequest(req) => {
738 Self::WorkflowHistoryRequest(req.into())
739 }
740 }
741 }
742}
743
744impl From<v2::ToServer> for v3::ToServer {
745 fn from(value: v2::ToServer) -> Self {
746 Self {
747 body: value.body.into(),
748 }
749 }
750}
751
752impl From<v2::ToClientBody> for v3::ToClientBody {
753 fn from(value: v2::ToClientBody) -> Self {
754 match value {
755 v2::ToClientBody::StateResponse(resp) => Self::StateResponse(resp.into()),
756 v2::ToClientBody::ConnectionsResponse(resp) => Self::ConnectionsResponse(resp.into()),
757 v2::ToClientBody::ActionResponse(resp) => Self::ActionResponse(resp.into()),
758 v2::ToClientBody::ConnectionsUpdated(update) => Self::ConnectionsUpdated(update.into()),
759 v2::ToClientBody::QueueUpdated(update) => Self::QueueUpdated(update.into()),
760 v2::ToClientBody::StateUpdated(update) => Self::StateUpdated(update.into()),
761 v2::ToClientBody::WorkflowHistoryUpdated(update) => {
762 Self::WorkflowHistoryUpdated(update.into())
763 }
764 v2::ToClientBody::RpcsListResponse(resp) => Self::RpcsListResponse(resp.into()),
765 v2::ToClientBody::TraceQueryResponse(resp) => Self::TraceQueryResponse(resp.into()),
766 v2::ToClientBody::QueueResponse(resp) => Self::QueueResponse(resp.into()),
767 v2::ToClientBody::WorkflowHistoryResponse(resp) => {
768 Self::WorkflowHistoryResponse(resp.into())
769 }
770 v2::ToClientBody::Error(error) => Self::Error(error.into()),
771 v2::ToClientBody::Init(init) => Self::Init(init.into()),
772 }
773 }
774}
775
776impl From<v2::ToClient> for v3::ToClient {
777 fn from(value: v2::ToClient) -> Self {
778 Self {
779 body: value.body.into(),
780 }
781 }
782}
783
784fn dropped_error(message: &str) -> v2::Error {
785 v2::Error {
786 message: message.to_owned(),
787 }
788}
789
790#[cfg(test)]
791mod tests {
792 use super::*;
793
794 #[test]
795 fn v3_database_schema_request_keeps_meaning_when_upgrading_to_v4() {
796 let request = ToServer::V3(v3::ToServer {
797 body: v3::ToServerBody::DatabaseSchemaRequest(v3::DatabaseSchemaRequest {
798 id: Uint(7),
799 }),
800 });
801
802 let ToServer::V4(upgraded) = ToServer::v3_to_v4(request).unwrap() else {
803 panic!("expected v4 request")
804 };
805
806 assert!(matches!(
807 upgraded.body,
808 v4::ToServerBody::DatabaseSchemaRequest(v4::DatabaseSchemaRequest { id }) if id == Uint(7)
809 ));
810 }
811
812 #[test]
813 fn v4_workflow_replay_response_downgrades_to_v3_error() {
814 let response = ToClient::V4(v4::ToClient {
815 body: v4::ToClientBody::WorkflowReplayResponse(v4::WorkflowReplayResponse {
816 rid: Uint(11),
817 history: Some(b"workflow".to_vec()),
818 is_workflow_enabled: true,
819 }),
820 });
821
822 let ToClient::V3(downgraded) = ToClient::v4_to_v3(response).unwrap() else {
823 panic!("expected v3 response")
824 };
825
826 assert_eq!(
827 downgraded.body,
828 v3::ToClientBody::Error(v3::Error {
829 message: WORKFLOW_HISTORY_DROPPED_ERROR.to_owned(),
830 })
831 );
832 }
833}