1use {
2 crate::protobuf::decode::{
3 LimitedDecode, SubscribeUpdateLimitedDecode, UpdateOneofLimitedDecode,
4 UpdateOneofLimitedDecodeAccount, UpdateOneofLimitedDecodeEntry,
5 UpdateOneofLimitedDecodeSlot, UpdateOneofLimitedDecodeTransaction,
6 UpdateOneofLimitedDecodeTransactionInfo,
7 },
8 prost::{
9 Message as _,
10 bytes::Buf,
11 encoding::{decode_varint, encode_varint, encoded_len_varint},
12 },
13 prost_types::Timestamp,
14 richat_proto::{
15 convert_from,
16 geyser::{
17 SlotStatus, SubscribeUpdate, SubscribeUpdateAccountInfo, SubscribeUpdateBlockMeta,
18 SubscribeUpdateEntry, SubscribeUpdateTransactionInfo, subscribe_update::UpdateOneof,
19 },
20 solana::storage::confirmed_block::{TransactionError, TransactionStatusMeta},
21 },
22 serde::{Deserialize, Serialize},
23 solana_account::ReadableAccount,
24 solana_clock::{Epoch, Slot},
25 solana_pubkey::{PUBKEY_BYTES, Pubkey},
26 solana_signature::{SIGNATURE_BYTES, Signature},
27 solana_transaction_status::{
28 ConfirmedBlock, TransactionWithStatusMeta, VersionedTransactionWithStatusMeta,
29 },
30 std::{
31 borrow::Cow,
32 collections::HashSet,
33 ops::{Deref, Range},
34 sync::{Arc, OnceLock},
35 },
36 thiserror::Error,
37};
38
39#[derive(Debug, Error)]
40pub enum MessageParseError {
41 #[error(transparent)]
42 Prost(#[from] prost::DecodeError),
43 #[error("Field `{0}` should be defined")]
44 FieldNotDefined(&'static str),
45 #[error("Invalid enum value: {0}")]
46 InvalidEnumValue(i32),
47 #[error("Invalid pubkey length")]
48 InvalidPubkey,
49 #[error("Invalid update: {0}")]
50 InvalidUpdateMessage(&'static str),
51 #[error("Incompatible encoding")]
52 IncompatibleEncoding,
53}
54
55#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
56#[serde(rename_all = "lowercase")]
57pub enum MessageParserEncoding {
58 Limited,
60 Prost,
62}
63
64#[derive(Debug, Clone, Copy)]
65pub enum MessageRef<'a> {
66 Slot(&'a MessageSlot),
67 Account(&'a MessageAccount),
68 Transaction(&'a MessageTransaction),
69 Entry(&'a MessageEntry),
70 BlockMeta(&'a MessageBlockMeta),
71 Block(&'a MessageBlock),
72}
73
74impl<'a> From<&'a Message> for MessageRef<'a> {
75 fn from(message: &'a Message) -> Self {
76 match message {
77 Message::Slot(msg) => Self::Slot(msg),
78 Message::Account(msg) => Self::Account(msg),
79 Message::Transaction(msg) => Self::Transaction(msg),
80 Message::Entry(msg) => Self::Entry(msg),
81 Message::BlockMeta(msg) => Self::BlockMeta(msg),
82 Message::Block(msg) => Self::Block(msg),
83 }
84 }
85}
86
87#[allow(clippy::large_enum_variant)]
88#[derive(Debug, Clone)]
89pub enum Message {
90 Slot(MessageSlot),
91 Account(MessageAccount),
92 Transaction(MessageTransaction),
93 Entry(MessageEntry),
94 BlockMeta(MessageBlockMeta),
95 Block(MessageBlock),
96}
97
98impl Message {
99 pub fn parse(
100 data: Cow<'_, [u8]>,
101 parser: MessageParserEncoding,
102 ) -> Result<Self, MessageParseError> {
103 match parser {
104 MessageParserEncoding::Limited => MessageParserLimited::parse(data),
105 MessageParserEncoding::Prost => MessageParserProst::parse(data),
106 }
107 }
108
109 pub fn create_block(
110 accounts: Vec<Arc<MessageAccount>>,
111 transactions: Vec<Arc<MessageTransaction>>,
112 entries: Vec<Arc<MessageEntry>>,
113 block_meta: Arc<MessageBlockMeta>,
114 created_at: impl Into<MessageBlockCreatedAt>,
115 ) -> Result<Self, MessageParseError> {
116 let created_at = created_at.into();
117 let created_at_encoding = created_at.encoding();
118
119 for encoding in std::iter::once(block_meta.encoding())
120 .chain(accounts.iter().map(|x| x.encoding()))
121 .chain(transactions.iter().map(|x| x.encoding()))
122 .chain(entries.iter().map(|x| x.encoding()))
123 {
124 if encoding != created_at_encoding {
125 return Err(MessageParseError::IncompatibleEncoding);
126 }
127 }
128
129 Ok(Self::Block(Self::unchecked_create_block(
130 accounts,
131 transactions,
132 entries,
133 block_meta,
134 created_at,
135 )))
136 }
137
138 pub const fn unchecked_create_block(
139 accounts: Vec<Arc<MessageAccount>>,
140 transactions: Vec<Arc<MessageTransaction>>,
141 entries: Vec<Arc<MessageEntry>>,
142 block_meta: Arc<MessageBlockMeta>,
143 created_at: MessageBlockCreatedAt,
144 ) -> MessageBlock {
145 MessageBlock {
146 accounts,
147 transactions,
148 entries,
149 block_meta,
150 created_at,
151 }
152 }
153
154 pub fn slot(&self) -> Slot {
155 match self {
156 Self::Slot(msg) => msg.slot(),
157 Self::Account(msg) => msg.slot(),
158 Self::Transaction(msg) => msg.slot(),
159 Self::Entry(msg) => msg.slot(),
160 Self::BlockMeta(msg) => msg.slot(),
161 Self::Block(msg) => msg.slot(),
162 }
163 }
164
165 pub const fn created_at(&self) -> MessageBlockCreatedAt {
166 match self {
167 Self::Slot(msg) => msg.created_at(),
168 Self::Account(msg) => msg.created_at(),
169 Self::Transaction(msg) => msg.created_at(),
170 Self::Entry(msg) => msg.created_at(),
171 Self::BlockMeta(msg) => msg.created_at(),
172 Self::Block(msg) => msg.created_at(),
173 }
174 }
175
176 pub fn size(&self) -> usize {
177 match self {
178 Self::Slot(msg) => msg.size(),
179 Self::Account(msg) => msg.size(),
180 Self::Transaction(msg) => msg.size(),
181 Self::Entry(msg) => msg.size(),
182 Self::BlockMeta(msg) => msg.size(),
183 Self::Block(msg) => msg.size(),
184 }
185 }
186}
187
188#[derive(Debug)]
189pub struct MessageParserLimited;
190
191impl MessageParserLimited {
192 pub fn parse(data: Cow<'_, [u8]>) -> Result<Message, MessageParseError> {
193 let data = data.into_owned();
194 let update = SubscribeUpdateLimitedDecode::decode(data.as_slice())?;
195 let created_at = update
196 .created_at
197 .ok_or(MessageParseError::FieldNotDefined("created_at"))?;
198
199 Ok(
200 match update
201 .update_oneof
202 .ok_or(MessageParseError::FieldNotDefined("update_oneof"))?
203 {
204 UpdateOneofLimitedDecode::Slot(range) => {
205 let message = UpdateOneofLimitedDecodeSlot::decode(
206 &data.as_slice()[range.start..range.end],
207 )?;
208 Message::Slot(MessageSlot::Limited {
209 slot: message.slot,
210 parent: message.parent,
211 status: SlotStatus::try_from(message.status)
212 .map_err(|_| MessageParseError::InvalidEnumValue(message.status))?,
213 dead_error: message.dead_error,
214 created_at,
215 buffer: data,
216 range,
217 })
218 }
219 UpdateOneofLimitedDecode::Account(range) => {
220 let message = UpdateOneofLimitedDecodeAccount::decode(
221 &data.as_slice()[range.start..range.end],
222 )?;
223
224 if message.account == usize::MAX {
225 return Err(MessageParseError::FieldNotDefined("account"));
226 }
227
228 let mut data_range = message.data;
229 data_range.start += range.start;
230 data_range.end += range.start;
231
232 Message::Account(MessageAccount::Limited {
233 pubkey: message.pubkey,
234 owner: message.owner,
235 lamports: message.lamports,
236 executable: message.executable,
237 rent_epoch: message.rent_epoch,
238 data: data_range,
239 txn_signature_offset: message
240 .txn_signature_offset
241 .map(|offset| offset + range.start),
242 write_version: message.write_version + range.start,
243 slot: message.slot,
244 is_startup: message.is_startup,
245 created_at,
246 buffer: data,
247 account_offset: message.account + range.start,
248 range,
249 })
250 }
251 UpdateOneofLimitedDecode::Transaction(range) => {
252 let message = UpdateOneofLimitedDecodeTransaction::decode(
253 &data.as_slice()[range.start..range.end],
254 )?;
255 let mut transaction_range = message
256 .transaction
257 .ok_or(MessageParseError::FieldNotDefined("transaction"))?;
258 transaction_range.start += range.start;
259 transaction_range.end += range.start;
260
261 let tx_info = UpdateOneofLimitedDecodeTransactionInfo::decode(
262 &data.as_slice()[transaction_range.start..transaction_range.end],
263 )?;
264
265 let error = tx_info
266 .err
267 .map(|err_range| {
268 TransactionError::decode(
269 &data.as_slice()[transaction_range.start + err_range.start
270 ..transaction_range.start + err_range.end],
271 )
272 })
273 .transpose()?;
274
275 Message::Transaction(MessageTransaction::Limited {
276 signature_offset: tx_info
277 .signature_offset
278 .map(|offset| transaction_range.start + offset),
279 error,
280 account_keys: tx_info.account_keys,
281 is_vote: tx_info.is_vote,
282 index: tx_info.index,
283 transaction_range,
284 transaction: OnceLock::new(),
285 slot: message.slot,
286 created_at,
287 buffer: data,
288 range,
289 })
290 }
291 UpdateOneofLimitedDecode::TransactionStatus(_) => {
292 return Err(MessageParseError::InvalidUpdateMessage("TransactionStatus"));
293 }
294 UpdateOneofLimitedDecode::Entry(range) => {
295 let entry = UpdateOneofLimitedDecodeEntry::decode(
296 &data.as_slice()[range.start..range.end],
297 )?;
298 Message::Entry(MessageEntry::Limited {
299 slot: entry.slot,
300 index: entry.index,
301 executed_transaction_count: entry.executed_transaction_count,
302 created_at,
303 buffer: data,
304 range,
305 })
306 }
307 UpdateOneofLimitedDecode::BlockMeta(range) => {
308 let block_meta =
309 SubscribeUpdateBlockMeta::decode(&data.as_slice()[range.start..range.end])?;
310
311 let block_height = block_meta
312 .block_height
313 .map(|v| v.block_height)
314 .ok_or(MessageParseError::FieldNotDefined("block_height"))?;
315
316 Message::BlockMeta(MessageBlockMeta::Limited {
317 block_meta,
318 block_height,
319 created_at,
320 buffer: data,
321 range,
322 })
323 }
324 UpdateOneofLimitedDecode::Block(_) => {
325 return Err(MessageParseError::InvalidUpdateMessage("Block"));
326 }
327 UpdateOneofLimitedDecode::Ping(_) => {
328 return Err(MessageParseError::InvalidUpdateMessage("Ping"));
329 }
330 UpdateOneofLimitedDecode::Pong(_) => {
331 return Err(MessageParseError::InvalidUpdateMessage("Pong"));
332 }
333 },
334 )
335 }
336}
337
338#[derive(Debug)]
339pub struct MessageParserProst;
340
341impl MessageParserProst {
342 pub fn parse(data: Cow<'_, [u8]>) -> Result<Message, MessageParseError> {
343 let update = SubscribeUpdate::decode(data.deref())?;
344 let encoded_len = data.len();
345
346 let created_at = update
347 .created_at
348 .ok_or(MessageParseError::FieldNotDefined("created_at"))?;
349
350 Ok(
351 match update
352 .update_oneof
353 .ok_or(MessageParseError::FieldNotDefined("update_oneof"))?
354 {
355 UpdateOneof::Slot(message) => Message::Slot(MessageSlot::Prost {
356 slot: message.slot,
357 parent: message.parent,
358 status: SlotStatus::try_from(message.status)
359 .map_err(|_| MessageParseError::InvalidEnumValue(message.status))?,
360 dead_error: message.dead_error,
361 created_at,
362 size: encoded_len,
363 }),
364 UpdateOneof::Account(message) => {
365 let account = message
366 .account
367 .ok_or(MessageParseError::FieldNotDefined("account"))?;
368 Message::Account(MessageAccount::Prost {
369 pubkey: account
370 .pubkey
371 .as_slice()
372 .try_into()
373 .map_err(|_| MessageParseError::InvalidPubkey)?,
374 owner: account
375 .owner
376 .as_slice()
377 .try_into()
378 .map_err(|_| MessageParseError::InvalidPubkey)?,
379 account,
380 slot: message.slot,
381 is_startup: message.is_startup,
382 created_at,
383 size: PUBKEY_BYTES + PUBKEY_BYTES + encoded_len + 20,
384 })
385 }
386 UpdateOneof::Transaction(message) => {
387 let transaction = message
388 .transaction
389 .ok_or(MessageParseError::FieldNotDefined("transaction"))?;
390 let meta = transaction
391 .meta
392 .as_ref()
393 .ok_or(MessageParseError::FieldNotDefined("meta"))?;
394
395 let account_keys =
396 MessageTransaction::gen_account_keys_prost(&transaction, meta)?;
397 let account_keys_capacity = account_keys.capacity();
398
399 Message::Transaction(MessageTransaction::Prost {
400 error: meta.err.clone(),
401 account_keys,
402 transaction,
403 slot: message.slot,
404 created_at,
405 size: encoded_len + SIGNATURE_BYTES + account_keys_capacity * PUBKEY_BYTES,
406 })
407 }
408 UpdateOneof::TransactionStatus(_) => {
409 return Err(MessageParseError::InvalidUpdateMessage("TransactionStatus"));
410 }
411 UpdateOneof::Entry(entry) => Message::Entry(MessageEntry::Prost {
412 entry,
413 created_at,
414 size: encoded_len,
415 }),
416 UpdateOneof::BlockMeta(block_meta) => {
417 let block_height = block_meta
418 .block_height
419 .map(|v| v.block_height)
420 .ok_or(MessageParseError::FieldNotDefined("block_height"))?;
421 Message::BlockMeta(MessageBlockMeta::Prost {
422 block_meta,
423 block_height,
424 created_at,
425 size: encoded_len,
426 })
427 }
428 UpdateOneof::Block(message) => {
429 let accounts = message
430 .accounts
431 .into_iter()
432 .map(|account| {
433 let encoded_len = account.encoded_len();
434 Ok(Arc::new(MessageAccount::Prost {
435 pubkey: account
436 .pubkey
437 .as_slice()
438 .try_into()
439 .map_err(|_| MessageParseError::InvalidPubkey)?,
440 owner: account
441 .owner
442 .as_slice()
443 .try_into()
444 .map_err(|_| MessageParseError::InvalidPubkey)?,
445 account,
446 slot: message.slot,
447 is_startup: false,
448 created_at,
449 size: PUBKEY_BYTES + PUBKEY_BYTES + encoded_len + 32,
450 }))
451 })
452 .collect::<Result<_, MessageParseError>>()?;
453
454 let transactions = message
455 .transactions
456 .into_iter()
457 .map(|transaction| {
458 let meta = transaction
459 .meta
460 .as_ref()
461 .ok_or(MessageParseError::FieldNotDefined("meta"))?;
462
463 let account_keys =
464 MessageTransaction::gen_account_keys_prost(&transaction, meta)?;
465 let account_keys_capacity = account_keys.capacity();
466
467 Ok(Arc::new(MessageTransaction::Prost {
468 error: meta.err.clone(),
469 account_keys,
470 transaction,
471 slot: message.slot,
472 created_at,
473 size: encoded_len
474 + SIGNATURE_BYTES
475 + account_keys_capacity * PUBKEY_BYTES,
476 }))
477 })
478 .collect::<Result<_, MessageParseError>>()?;
479
480 let entries = message
481 .entries
482 .into_iter()
483 .map(|entry| {
484 let encoded_len = entry.encoded_len();
485 Arc::new(MessageEntry::Prost {
486 entry,
487 created_at,
488 size: encoded_len,
489 })
490 })
491 .collect();
492
493 let block_meta = SubscribeUpdateBlockMeta {
494 slot: message.slot,
495 blockhash: message.blockhash,
496 rewards: message.rewards,
497 block_time: message.block_time,
498 block_height: message.block_height,
499 parent_slot: message.parent_slot,
500 parent_blockhash: message.parent_blockhash,
501 executed_transaction_count: message.executed_transaction_count,
502 entries_count: message.entries_count,
503 };
504 let encoded_len = block_meta.encoded_len();
505 let block_height = block_meta
506 .block_height
507 .map(|v| v.block_height)
508 .ok_or(MessageParseError::FieldNotDefined("block_height"))?;
509
510 Message::Block(MessageBlock {
511 accounts,
512 transactions,
513 entries,
514 block_meta: Arc::new(MessageBlockMeta::Prost {
515 block_meta,
516 block_height,
517 created_at,
518 size: encoded_len,
519 }),
520 created_at: MessageBlockCreatedAt::Prost(created_at),
521 })
522 }
523 UpdateOneof::Ping(_) => {
524 return Err(MessageParseError::InvalidUpdateMessage("Ping"));
525 }
526 UpdateOneof::Pong(_) => {
527 return Err(MessageParseError::InvalidUpdateMessage("Pong"));
528 }
529 },
530 )
531 }
532}
533
534#[derive(Debug, Clone)]
535pub enum MessageSlot {
536 Limited {
537 slot: Slot,
538 parent: Option<Slot>,
539 status: SlotStatus,
540 dead_error: Option<Range<usize>>,
541 created_at: Timestamp,
542 buffer: Vec<u8>,
543 range: Range<usize>,
544 },
545 Prost {
546 slot: Slot,
547 parent: Option<Slot>,
548 status: SlotStatus,
549 dead_error: Option<String>,
550 created_at: Timestamp,
551 size: usize,
552 },
553}
554
555impl MessageSlot {
556 pub const fn encoding(&self) -> MessageParserEncoding {
557 match self {
558 Self::Limited { .. } => MessageParserEncoding::Limited,
559 Self::Prost { .. } => MessageParserEncoding::Prost,
560 }
561 }
562
563 pub const fn created_at(&self) -> MessageBlockCreatedAt {
564 match self {
565 Self::Limited { created_at, .. } => MessageBlockCreatedAt::Limited(*created_at),
566 Self::Prost { created_at, .. } => MessageBlockCreatedAt::Prost(*created_at),
567 }
568 }
569
570 pub const fn slot(&self) -> Slot {
571 match self {
572 Self::Limited { slot, .. } => *slot,
573 Self::Prost { slot, .. } => *slot,
574 }
575 }
576
577 pub fn size(&self) -> usize {
578 match self {
579 Self::Limited { buffer, .. } => buffer.len() + 64,
580 Self::Prost { size, .. } => *size,
581 }
582 }
583
584 pub const fn status(&self) -> SlotStatus {
585 match self {
586 Self::Limited { status, .. } => *status,
587 Self::Prost { status, .. } => *status,
588 }
589 }
590
591 pub const fn parent(&self) -> Option<Slot> {
592 match self {
593 Self::Limited { parent, .. } => *parent,
594 Self::Prost { parent, .. } => *parent,
595 }
596 }
597
598 pub fn dead_error(&self) -> Option<&str> {
599 match self {
600 Self::Limited {
601 dead_error, buffer, ..
602 } => dead_error.as_ref().map(|range| unsafe {
603 std::str::from_utf8_unchecked(&buffer.as_slice()[range.start..range.end])
604 }),
605 Self::Prost { dead_error, .. } => dead_error.as_deref(),
606 }
607 }
608}
609
610#[allow(clippy::large_enum_variant)]
611#[derive(Debug, Clone)]
612#[cfg_attr(test, derive(PartialEq))]
613pub enum MessageAccount {
614 Limited {
615 pubkey: Pubkey,
616 owner: Pubkey,
617 lamports: u64,
618 executable: bool,
619 rent_epoch: Epoch,
620 data: Range<usize>,
621 txn_signature_offset: Option<usize>,
622 write_version: usize,
623 slot: Slot,
624 is_startup: bool,
625 created_at: Timestamp,
626 buffer: Vec<u8>,
627 account_offset: usize,
628 range: Range<usize>,
629 },
630 Prost {
631 pubkey: Pubkey,
632 owner: Pubkey,
633 account: SubscribeUpdateAccountInfo,
634 slot: Slot,
635 is_startup: bool,
636 created_at: Timestamp,
637 size: usize,
638 },
639}
640
641impl MessageAccount {
642 pub const fn encoding(&self) -> MessageParserEncoding {
643 match self {
644 Self::Limited { .. } => MessageParserEncoding::Limited,
645 Self::Prost { .. } => MessageParserEncoding::Prost,
646 }
647 }
648
649 pub const fn slot(&self) -> Slot {
650 match self {
651 Self::Limited { slot, .. } => *slot,
652 Self::Prost { slot, .. } => *slot,
653 }
654 }
655
656 pub const fn created_at(&self) -> MessageBlockCreatedAt {
657 match self {
658 Self::Limited { created_at, .. } => MessageBlockCreatedAt::Limited(*created_at),
659 Self::Prost { created_at, .. } => MessageBlockCreatedAt::Prost(*created_at),
660 }
661 }
662
663 pub fn size(&self) -> usize {
664 match self {
665 Self::Limited { buffer, .. } => buffer.len() + PUBKEY_BYTES * 2 + 86,
666 Self::Prost { size, .. } => *size,
667 }
668 }
669
670 pub const fn pubkey(&self) -> &Pubkey {
671 match self {
672 Self::Limited { pubkey, .. } => pubkey,
673 Self::Prost { pubkey, .. } => pubkey,
674 }
675 }
676
677 pub fn write_version(&self) -> u64 {
678 match self {
679 Self::Limited {
680 write_version,
681 buffer,
682 ..
683 } => {
684 let mut buffer = &buffer.as_slice()[*write_version..];
685 decode_varint(&mut buffer).expect("already verified")
686 }
687 Self::Prost { account, .. } => account.write_version,
688 }
689 }
690
691 pub fn txn_signature(&self) -> Option<&[u8]> {
692 match self {
693 MessageAccount::Limited {
694 txn_signature_offset,
695 buffer,
696 ..
697 } => txn_signature_offset.map(|offset| &buffer.as_slice()[offset..offset + 64]),
698 MessageAccount::Prost { account, .. } => account.txn_signature.as_deref(),
699 }
700 }
701
702 pub const fn nonempty_txn_signature(&self) -> bool {
703 match self {
704 Self::Limited {
705 txn_signature_offset,
706 ..
707 } => txn_signature_offset.is_some(),
708 Self::Prost { account, .. } => account.txn_signature.is_some(),
709 }
710 }
711
712 pub fn update_write_version(&mut self, write_version: u64) {
713 match self {
714 Self::Limited {
715 buffer,
716 range,
717 account_offset,
718 write_version: write_version_current,
719 data,
720 txn_signature_offset,
721 ..
722 } => {
723 let mut buf = &mut &buffer.as_slice()[*write_version_current..];
725 let start = buf.remaining();
726 decode_varint(&mut buf).expect("already verified");
727 let wv_size_current = start - buf.remaining();
728 let wv_size_new = encoded_len_varint(write_version);
729
730 let mut buf = &mut &buffer.as_slice()[*account_offset..];
732 let start = buf.remaining();
733 let msg_size = decode_varint(&mut buf).expect("already verified");
734 let msg_size_current = start - buf.remaining();
735 let msg_size = msg_size + wv_size_new as u64 - wv_size_current as u64;
736 let msg_size_new = encoded_len_varint(msg_size);
737
738 let new_end =
740 range.end + msg_size_new - msg_size_current + wv_size_new - wv_size_current;
741 if new_end > buffer.len() {
742 buffer.resize(new_end, 0);
743 }
744
745 unsafe {
747 let end_current = *account_offset + msg_size_current;
748 let end_new = *account_offset + msg_size_new;
749 std::ptr::copy(
750 buffer.as_ptr().add(end_current),
751 buffer.as_mut_ptr().add(end_new),
752 *write_version_current - end_current,
753 );
754 }
755
756 let write_version_new = *write_version_current + msg_size_new - msg_size_current;
758 unsafe {
759 let end_current = *write_version_current + wv_size_current;
760 let end_new = write_version_new + wv_size_new;
761 std::ptr::copy(
762 buffer.as_ptr().add(end_current),
763 buffer.as_mut_ptr().add(end_new),
764 range.end - end_current,
765 );
766 }
767
768 encode_varint(msg_size, &mut &mut buffer.as_mut_slice()[*account_offset..]);
770 encode_varint(
771 write_version,
772 &mut &mut buffer.as_mut_slice()[write_version_new..],
773 );
774
775 range.end = new_end;
777 data.start = data.start + msg_size_new - msg_size_current;
779 data.end = data.end + msg_size_new - msg_size_current;
780 if data.start > write_version_new {
781 data.start = data.start + wv_size_new - wv_size_current;
782 data.end = data.end + wv_size_new - wv_size_current;
783 }
784 if let Some(txn_signature_offset) = txn_signature_offset {
785 *txn_signature_offset = *txn_signature_offset + msg_size_new - msg_size_current;
786 if *txn_signature_offset > write_version_new {
787 *txn_signature_offset =
788 *txn_signature_offset + wv_size_new - wv_size_current;
789 }
790 }
791 *write_version_current = write_version_new;
792 }
793 Self::Prost { account, size, .. } => {
794 *size = *size + encoded_len_varint(write_version)
795 - encoded_len_varint(account.write_version);
796 account.write_version = write_version;
797 }
798 }
799 }
800}
801
802impl ReadableAccount for MessageAccount {
803 fn lamports(&self) -> u64 {
804 match self {
805 Self::Limited { lamports, .. } => *lamports,
806 Self::Prost { account, .. } => account.lamports,
807 }
808 }
809
810 fn data(&self) -> &[u8] {
811 match self {
812 Self::Limited { data, buffer, .. } => &buffer.as_slice()[data.start..data.end],
813 Self::Prost { account, .. } => &account.data,
814 }
815 }
816
817 fn owner(&self) -> &Pubkey {
818 match self {
819 Self::Limited { owner, .. } => owner,
820 Self::Prost { owner, .. } => owner,
821 }
822 }
823
824 fn executable(&self) -> bool {
825 match self {
826 Self::Limited { executable, .. } => *executable,
827 Self::Prost { account, .. } => account.executable,
828 }
829 }
830
831 fn rent_epoch(&self) -> Epoch {
832 match self {
833 Self::Limited { rent_epoch, .. } => *rent_epoch,
834 Self::Prost { account, .. } => account.rent_epoch,
835 }
836 }
837}
838
839#[allow(clippy::large_enum_variant)]
840#[derive(Debug, Clone)]
841pub enum MessageTransaction {
842 Limited {
843 signature_offset: Option<usize>,
844 error: Option<TransactionError>,
845 account_keys: HashSet<Pubkey>,
846 is_vote: bool,
847 index: u64,
848 transaction_range: Range<usize>,
849 transaction: OnceLock<Option<SubscribeUpdateTransactionInfo>>,
850 slot: Slot,
851 created_at: Timestamp,
852 buffer: Vec<u8>,
853 range: Range<usize>,
854 },
855 Prost {
856 error: Option<TransactionError>,
857 account_keys: HashSet<Pubkey>,
858 transaction: SubscribeUpdateTransactionInfo,
859 slot: Slot,
860 created_at: Timestamp,
861 size: usize,
862 },
863}
864
865impl MessageTransaction {
866 pub const fn encoding(&self) -> MessageParserEncoding {
867 match self {
868 Self::Limited { .. } => MessageParserEncoding::Limited,
869 Self::Prost { .. } => MessageParserEncoding::Prost,
870 }
871 }
872
873 pub const fn slot(&self) -> Slot {
874 match self {
875 Self::Limited { slot, .. } => *slot,
876 Self::Prost { slot, .. } => *slot,
877 }
878 }
879
880 pub const fn created_at(&self) -> MessageBlockCreatedAt {
881 match self {
882 Self::Limited { created_at, .. } => MessageBlockCreatedAt::Limited(*created_at),
883 Self::Prost { created_at, .. } => MessageBlockCreatedAt::Prost(*created_at),
884 }
885 }
886
887 pub fn size(&self) -> usize {
888 match self {
889 Self::Limited {
890 account_keys,
891 buffer,
892 ..
893 } => buffer.len() * 2 + account_keys.capacity() * PUBKEY_BYTES,
894 Self::Prost { size, .. } => *size,
895 }
896 }
897
898 pub fn gen_account_keys_prost(
899 transaction: &SubscribeUpdateTransactionInfo,
900 meta: &TransactionStatusMeta,
901 ) -> Result<HashSet<Pubkey>, MessageParseError> {
902 let mut account_keys = HashSet::new();
903
904 if let Some(pubkeys) = transaction
906 .transaction
907 .as_ref()
908 .ok_or(MessageParseError::FieldNotDefined("transaction"))?
909 .message
910 .as_ref()
911 .map(|msg| msg.account_keys.as_slice())
912 {
913 for pubkey in pubkeys {
914 account_keys.insert(
915 Pubkey::try_from(pubkey.as_slice())
916 .map_err(|_| MessageParseError::InvalidPubkey)?,
917 );
918 }
919 }
920 for pubkey in meta.loaded_writable_addresses.iter() {
922 account_keys.insert(
923 Pubkey::try_from(pubkey.as_slice())
924 .map_err(|_| MessageParseError::InvalidPubkey)?,
925 );
926 }
927 for pubkey in meta.loaded_readonly_addresses.iter() {
928 account_keys.insert(
929 Pubkey::try_from(pubkey.as_slice())
930 .map_err(|_| MessageParseError::InvalidPubkey)?,
931 );
932 }
933
934 Ok(account_keys)
935 }
936
937 pub fn signature(&self) -> Signature {
938 Signature::from(
939 <[u8; SIGNATURE_BYTES]>::try_from(self.signature_ref())
940 .expect("signature must be 64 bytes"),
941 )
942 }
943
944 pub fn signature_ref(&self) -> &[u8] {
945 match self {
946 Self::Limited {
947 signature_offset,
948 buffer,
949 ..
950 } => match signature_offset {
951 Some(offset) => &buffer[*offset..*offset + SIGNATURE_BYTES],
952 None => panic!("transaction should have valid signature"),
953 },
954 Self::Prost { transaction, .. } => &transaction.signature,
955 }
956 }
957
958 pub const fn vote(&self) -> bool {
959 match self {
960 Self::Limited { is_vote, .. } => *is_vote,
961 Self::Prost { transaction, .. } => transaction.is_vote,
962 }
963 }
964
965 pub const fn index(&self) -> u64 {
966 match self {
967 Self::Limited { index, .. } => *index,
968 Self::Prost { transaction, .. } => transaction.index,
969 }
970 }
971
972 pub const fn failed(&self) -> bool {
973 match self {
974 Self::Limited { error, .. } => error.is_some(),
975 Self::Prost { error, .. } => error.is_some(),
976 }
977 }
978
979 pub const fn error(&self) -> &Option<TransactionError> {
980 match self {
981 Self::Limited { error, .. } => error,
982 Self::Prost { error, .. } => error,
983 }
984 }
985
986 fn transaction(&self) -> Result<&SubscribeUpdateTransactionInfo, &'static str> {
987 match self {
988 Self::Limited {
989 transaction,
990 transaction_range,
991 buffer,
992 ..
993 } => transaction
994 .get_or_init(|| {
995 SubscribeUpdateTransactionInfo::decode(
996 &buffer.as_slice()[transaction_range.clone()],
997 )
998 .ok()
999 })
1000 .as_ref()
1001 .ok_or("FailedToDecode"),
1002 Self::Prost { transaction, .. } => Ok(transaction),
1003 }
1004 }
1005
1006 pub fn transaction_meta(&self) -> Result<&TransactionStatusMeta, &'static str> {
1007 self.transaction()
1008 .and_then(|tx| tx.meta.as_ref().ok_or("FieldNotDefined"))
1009 }
1010
1011 pub fn as_versioned_transaction_with_status_meta(
1012 &self,
1013 ) -> Result<VersionedTransactionWithStatusMeta, &'static str> {
1014 Ok(VersionedTransactionWithStatusMeta {
1015 transaction: convert_from::create_tx_versioned(
1016 self.transaction()?
1017 .transaction
1018 .clone()
1019 .ok_or("FieldNotDefined")?,
1020 )?,
1021 meta: convert_from::create_tx_meta(self.transaction_meta()?.clone())?,
1022 })
1023 }
1024
1025 pub const fn account_keys(&self) -> &HashSet<Pubkey> {
1026 match self {
1027 Self::Limited { account_keys, .. } => account_keys,
1028 Self::Prost { account_keys, .. } => account_keys,
1029 }
1030 }
1031}
1032
1033#[derive(Debug, Clone)]
1034pub enum MessageEntry {
1035 Limited {
1036 slot: Slot,
1037 index: u64,
1038 executed_transaction_count: u64,
1039 created_at: Timestamp,
1040 buffer: Vec<u8>,
1041 range: Range<usize>,
1042 },
1043 Prost {
1044 entry: SubscribeUpdateEntry,
1045 created_at: Timestamp,
1046 size: usize,
1047 },
1048}
1049
1050impl MessageEntry {
1051 pub const fn encoding(&self) -> MessageParserEncoding {
1052 match self {
1053 Self::Limited { .. } => MessageParserEncoding::Limited,
1054 Self::Prost { .. } => MessageParserEncoding::Prost,
1055 }
1056 }
1057
1058 pub const fn slot(&self) -> Slot {
1059 match self {
1060 Self::Limited { slot, .. } => *slot,
1061 Self::Prost { entry, .. } => entry.slot,
1062 }
1063 }
1064
1065 pub const fn created_at(&self) -> MessageBlockCreatedAt {
1066 match self {
1067 Self::Limited { created_at, .. } => MessageBlockCreatedAt::Limited(*created_at),
1068 Self::Prost { created_at, .. } => MessageBlockCreatedAt::Prost(*created_at),
1069 }
1070 }
1071
1072 pub fn size(&self) -> usize {
1073 match self {
1074 Self::Limited { buffer, .. } => buffer.len() + 52,
1075 Self::Prost { size, .. } => *size,
1076 }
1077 }
1078
1079 pub const fn index(&self) -> u64 {
1080 match self {
1081 Self::Limited { index, .. } => *index,
1082 Self::Prost { entry, .. } => entry.index,
1083 }
1084 }
1085
1086 pub const fn executed_transaction_count(&self) -> u64 {
1087 match self {
1088 Self::Limited {
1089 executed_transaction_count,
1090 ..
1091 } => *executed_transaction_count,
1092 Self::Prost { entry, .. } => entry.executed_transaction_count,
1093 }
1094 }
1095}
1096
1097#[derive(Debug, Clone)]
1098pub enum MessageBlockMeta {
1099 Limited {
1100 block_meta: SubscribeUpdateBlockMeta,
1101 block_height: Slot,
1102 created_at: Timestamp,
1103 buffer: Vec<u8>,
1104 range: Range<usize>,
1105 },
1106 Prost {
1107 block_meta: SubscribeUpdateBlockMeta,
1108 block_height: Slot,
1109 created_at: Timestamp,
1110 size: usize,
1111 },
1112}
1113
1114impl MessageBlockMeta {
1115 pub const fn encoding(&self) -> MessageParserEncoding {
1116 match self {
1117 Self::Limited { .. } => MessageParserEncoding::Limited,
1118 Self::Prost { .. } => MessageParserEncoding::Prost,
1119 }
1120 }
1121
1122 pub const fn slot(&self) -> Slot {
1123 match self {
1124 Self::Limited { block_meta, .. } => block_meta.slot,
1125 Self::Prost { block_meta, .. } => block_meta.slot,
1126 }
1127 }
1128
1129 pub const fn created_at(&self) -> MessageBlockCreatedAt {
1130 match self {
1131 Self::Limited { created_at, .. } => MessageBlockCreatedAt::Limited(*created_at),
1132 Self::Prost { created_at, .. } => MessageBlockCreatedAt::Prost(*created_at),
1133 }
1134 }
1135
1136 pub fn size(&self) -> usize {
1137 match self {
1138 Self::Limited { buffer, .. } => buffer.len() * 2,
1139 Self::Prost { size, .. } => *size,
1140 }
1141 }
1142
1143 #[allow(clippy::missing_const_for_fn)]
1145 pub fn blockhash(&self) -> &str {
1146 match self {
1147 Self::Limited { block_meta, .. } => &block_meta.blockhash,
1148 Self::Prost { block_meta, .. } => &block_meta.blockhash,
1149 }
1150 }
1151
1152 pub const fn block_height(&self) -> Slot {
1153 match self {
1154 Self::Limited { block_height, .. } => *block_height,
1155 Self::Prost { block_height, .. } => *block_height,
1156 }
1157 }
1158
1159 pub const fn executed_transaction_count(&self) -> u64 {
1160 match self {
1161 Self::Limited { block_meta, .. } => block_meta.executed_transaction_count,
1162 Self::Prost { block_meta, .. } => block_meta.executed_transaction_count,
1163 }
1164 }
1165
1166 pub const fn entries_count(&self) -> u64 {
1167 match self {
1168 Self::Limited { block_meta, .. } => block_meta.entries_count,
1169 Self::Prost { block_meta, .. } => block_meta.entries_count,
1170 }
1171 }
1172}
1173
1174#[derive(Debug, Clone)]
1175pub struct MessageBlock {
1176 pub accounts: Vec<Arc<MessageAccount>>,
1177 pub transactions: Vec<Arc<MessageTransaction>>,
1178 pub entries: Vec<Arc<MessageEntry>>,
1179 pub block_meta: Arc<MessageBlockMeta>,
1180 pub created_at: MessageBlockCreatedAt,
1181}
1182
1183impl MessageBlock {
1184 pub const fn encoding(&self) -> MessageParserEncoding {
1185 self.created_at.encoding()
1186 }
1187
1188 pub fn slot(&self) -> Slot {
1189 self.block_meta.as_ref().slot()
1190 }
1191
1192 pub const fn created_at(&self) -> MessageBlockCreatedAt {
1193 self.created_at
1194 }
1195
1196 pub fn size(&self) -> usize {
1197 self.accounts
1198 .iter()
1199 .map(|m| m.size())
1200 .chain(self.transactions.iter().map(|m| m.size()))
1201 .chain(self.entries.iter().map(|m| m.size()))
1202 .sum::<usize>()
1203 + self.block_meta.size()
1204 }
1205
1206 pub fn as_confirmed_block(&self) -> Result<ConfirmedBlock, &'static str> {
1207 Ok(match self.block_meta.as_ref() {
1208 MessageBlockMeta::Limited { block_meta, .. } => ConfirmedBlock {
1209 previous_blockhash: block_meta.parent_blockhash.clone(),
1210 blockhash: block_meta.blockhash.clone(),
1211 parent_slot: block_meta.parent_slot,
1212 transactions: self
1213 .transactions
1214 .iter()
1215 .map(|tx| {
1216 tx.as_versioned_transaction_with_status_meta()
1217 .map(TransactionWithStatusMeta::Complete)
1218 })
1219 .collect::<Result<Vec<_>, _>>()?,
1220 rewards: block_meta
1221 .rewards
1222 .as_ref()
1223 .map(|r| {
1224 r.rewards
1225 .iter()
1226 .cloned()
1227 .map(convert_from::create_reward)
1228 .collect::<Result<Vec<_>, _>>()
1229 })
1230 .transpose()?
1231 .unwrap_or_default(),
1232 num_partitions: block_meta
1233 .rewards
1234 .as_ref()
1235 .and_then(|r| r.num_partitions)
1236 .map(|np| np.num_partitions),
1237 block_time: block_meta.block_time.map(|bt| bt.timestamp),
1238 block_height: block_meta.block_height.map(|bh| bh.block_height),
1239 },
1240 MessageBlockMeta::Prost { block_meta, .. } => ConfirmedBlock {
1241 previous_blockhash: block_meta.parent_blockhash.clone(),
1242 blockhash: block_meta.blockhash.clone(),
1243 parent_slot: block_meta.parent_slot,
1244 transactions: self
1245 .transactions
1246 .iter()
1247 .map(|tx| {
1248 tx.as_versioned_transaction_with_status_meta()
1249 .map(TransactionWithStatusMeta::Complete)
1250 })
1251 .collect::<Result<Vec<_>, _>>()?,
1252 rewards: block_meta
1253 .rewards
1254 .as_ref()
1255 .map(|r| {
1256 r.rewards
1257 .iter()
1258 .cloned()
1259 .map(convert_from::create_reward)
1260 .collect::<Result<Vec<_>, _>>()
1261 })
1262 .transpose()?
1263 .unwrap_or_default(),
1264 num_partitions: block_meta
1265 .rewards
1266 .as_ref()
1267 .and_then(|r| r.num_partitions)
1268 .map(|np| np.num_partitions),
1269 block_time: block_meta.block_time.map(|bt| bt.timestamp),
1270 block_height: block_meta.block_height.map(|bh| bh.block_height),
1271 },
1272 })
1273 }
1274}
1275
1276#[derive(Debug, Clone, Copy, PartialEq, Eq)]
1277pub enum MessageBlockCreatedAt {
1278 Limited(Timestamp),
1279 Prost(Timestamp),
1280}
1281
1282impl From<MessageBlockCreatedAt> for Timestamp {
1283 fn from(value: MessageBlockCreatedAt) -> Self {
1284 match value {
1285 MessageBlockCreatedAt::Limited(timestamp) => timestamp,
1286 MessageBlockCreatedAt::Prost(timestamp) => timestamp,
1287 }
1288 }
1289}
1290
1291impl MessageBlockCreatedAt {
1292 pub const fn encoding(&self) -> MessageParserEncoding {
1293 match self {
1294 Self::Limited(_) => MessageParserEncoding::Limited,
1295 Self::Prost(_) => MessageParserEncoding::Prost,
1296 }
1297 }
1298
1299 pub const fn as_millis(&self) -> u64 {
1300 match self {
1301 Self::Limited(ts) => ts.seconds as u64 * 1_000 + (ts.nanos / 1_000_000) as u64,
1302 Self::Prost(ts) => ts.seconds as u64 * 1_000 + (ts.nanos / 1_000_000) as u64,
1303 }
1304 }
1305}
1306
1307#[cfg(test)]
1308mod tests {
1309 use {
1310 super::{Message, MessageAccount, MessageParserEncoding, MessageRef},
1311 crate::{
1312 config::{ConfigFilter, ConfigFilterAccounts, ConfigFilterAccountsDataSlice},
1313 filter::Filter,
1314 },
1315 maplit::hashmap,
1316 solana_account::ReadableAccount,
1317 solana_commitment_config::CommitmentLevel,
1318 };
1319
1320 static MESSAGE: &str = "0a0012af010aa6010a2088f1ffa3a2dfe617bdc4e3573251a322e3fcae81e5a457390e64751c00a465e210e0d54a1a2006aa09548b50476ad462f91f89a3015033264fc9abd5270020a9d142334742fb28ffffffffffffffffff013208c921f474e044612838e3e1acc2b53042405bd620fab28d3c0b78b3ead9f04d1c4d6dffeac4ffa7c679a6570b0226557c10b4c4016d937e06044b4e49d9d7916524d5dfa26297c5f638c3d11f846410bc0510e5ddaca2015a0c08e1c79ec10610ebef838601";
1321
1322 fn parse(data: Vec<u8>, parser: MessageParserEncoding) -> MessageAccount {
1323 if let Message::Account(msg) = Message::parse(data.into(), parser).expect("valid message") {
1324 assert!(
1325 match parser {
1326 MessageParserEncoding::Prost => matches!(msg, MessageAccount::Prost { .. }),
1327 MessageParserEncoding::Limited => matches!(msg, MessageAccount::Limited { .. }),
1328 },
1329 "unexpected msg encoding"
1330 );
1331
1332 msg
1333 } else {
1334 panic!("expected account message");
1335 }
1336 }
1337
1338 fn encode(msg: &MessageAccount, data_slice: Option<(u64, u64)>) -> Vec<u8> {
1339 let filter = Filter::new(&ConfigFilter {
1340 accounts: hashmap! { "".to_owned() => ConfigFilterAccounts::default() },
1341 accounts_data_slice: data_slice
1342 .map(|(offset, length)| vec![ConfigFilterAccountsDataSlice { offset, length }])
1343 .unwrap_or_default(),
1344 ..Default::default()
1345 });
1346
1347 let message = Message::Account(msg.clone());
1348 let message_ref: MessageRef = (&message).into();
1349
1350 let updates = filter.get_updates_ref(message_ref, CommitmentLevel::Processed);
1351 assert_eq!(updates.len(), 1, "unexpected number of updates");
1352 updates[0].encode_to_vec()
1353 }
1354
1355 #[test]
1356 fn test_limited() {
1357 let mut msg = parse(
1358 const_hex::decode(MESSAGE).expect("valid hex"),
1359 MessageParserEncoding::Limited,
1360 );
1361 assert_eq!(msg.write_version(), 1663633666275, "valid write version");
1362
1363 msg.update_write_version(1);
1364 assert_eq!(msg.write_version(), 1, "dec valid write version");
1365 let mut msg2 = parse(encode(&msg, None), MessageParserEncoding::Limited);
1366 if let (
1367 MessageAccount::Limited { buffer, .. },
1368 MessageAccount::Limited {
1369 buffer: buffer2, ..
1370 },
1371 ) = (&msg, &mut msg2)
1372 {
1373 *buffer2 = buffer.clone(); }
1375 assert_eq!(msg, msg2, "write version update failed");
1376 let mut msg2 = parse(encode(&msg, Some((1, 3))), MessageParserEncoding::Limited);
1378 assert_eq!(msg.write_version(), msg2.write_version());
1379 assert_eq!(&msg.data()[1..4], msg2.data());
1380 if let (
1381 MessageAccount::Limited {
1382 buffer,
1383 txn_signature_offset,
1384 write_version,
1385 data,
1386 range,
1387 ..
1388 },
1389 MessageAccount::Limited {
1390 buffer: buffer2,
1391 txn_signature_offset: txn_signature_offset2,
1392 write_version: write_version2,
1393 data: data2,
1394 range: range2,
1395 ..
1396 },
1397 ) = (&msg, &mut msg2)
1398 {
1399 let txn_offset = txn_signature_offset.unwrap();
1400 let txn_offset2 = txn_signature_offset2.unwrap();
1401 assert_eq!(
1402 &buffer[txn_offset..txn_offset + 64],
1403 &buffer2[txn_offset2..txn_offset2 + 64]
1404 );
1405 *buffer2 = buffer.clone(); *txn_signature_offset2 = *txn_signature_offset;
1407 *write_version2 = *write_version;
1408 *data2 = data.clone();
1409 *range2 = range.clone();
1410 }
1411 assert_eq!(msg, msg2, "write version update failed");
1412
1413 msg.update_write_version(u64::MAX);
1414 assert_eq!(msg.write_version(), u64::MAX, "inc valid write version");
1415 let mut msg2 = parse(encode(&msg, None), MessageParserEncoding::Limited);
1416 if let (
1417 MessageAccount::Limited { buffer, .. },
1418 MessageAccount::Limited {
1419 buffer: buffer2, ..
1420 },
1421 ) = (&msg, &mut msg2)
1422 {
1423 *buffer2 = buffer.clone(); }
1425 assert_eq!(msg, msg2, "write version update failed");
1426 let mut msg2 = parse(encode(&msg, Some((1, 3))), MessageParserEncoding::Limited);
1428 assert_eq!(msg.write_version(), msg2.write_version());
1429 assert_eq!(&msg.data()[1..4], msg2.data());
1430 if let (
1431 MessageAccount::Limited {
1432 buffer,
1433 txn_signature_offset,
1434 write_version,
1435 data,
1436 range,
1437 ..
1438 },
1439 MessageAccount::Limited {
1440 buffer: buffer2,
1441 txn_signature_offset: txn_signature_offset2,
1442 write_version: write_version2,
1443 data: data2,
1444 range: range2,
1445 ..
1446 },
1447 ) = (&msg, &mut msg2)
1448 {
1449 let txn_offset = txn_signature_offset.unwrap();
1450 let txn_offset2 = txn_signature_offset2.unwrap();
1451 assert_eq!(
1452 &buffer[txn_offset..txn_offset + 64],
1453 &buffer2[txn_offset2..txn_offset2 + 64]
1454 );
1455 *buffer2 = buffer.clone(); *txn_signature_offset2 = *txn_signature_offset;
1457 *write_version2 = *write_version;
1458 *data2 = data.clone();
1459 *range2 = range.clone();
1460 }
1461 assert_eq!(msg, msg2, "write version update failed");
1462 }
1463
1464 #[test]
1465 fn test_prost() {
1466 let mut msg = parse(
1467 const_hex::decode(MESSAGE).expect("valid hex"),
1468 MessageParserEncoding::Prost,
1469 );
1470 assert_eq!(msg.write_version(), 1663633666275, "valid write version");
1471
1472 msg.update_write_version(1);
1473 assert_eq!(msg.write_version(), 1, "dec valid write version");
1474 let msg2 = parse(encode(&msg, None), MessageParserEncoding::Prost);
1475 assert_eq!(msg, msg2, "write version update failed");
1476
1477 msg.update_write_version(u64::MAX);
1478 assert_eq!(msg.write_version(), u64::MAX, "inc valid write version");
1479 let msg2 = parse(encode(&msg, None), MessageParserEncoding::Prost);
1480 assert_eq!(msg, msg2, "write version update failed");
1481 }
1482}