Skip to main content

richat_filter/
message.rs

1use {
2    crate::protobuf::decode::{
3        LimitedDecode, SubscribeUpdateLimitedDecode, UpdateOneofLimitedDecode,
4        UpdateOneofLimitedDecodeAccount, UpdateOneofLimitedDecodeEntry,
5        UpdateOneofLimitedDecodeSlot, UpdateOneofLimitedDecodeTransaction,
6        UpdateOneofLimitedDecodeTransactionInfo,
7    },
8    prost::{
9        Message as _,
10        bytes::Buf,
11        encoding::{decode_varint, encode_varint, encoded_len_varint},
12    },
13    prost_types::Timestamp,
14    richat_proto::{
15        convert_from,
16        geyser::{
17            SlotStatus, SubscribeUpdate, SubscribeUpdateAccountInfo, SubscribeUpdateBlockMeta,
18            SubscribeUpdateEntry, SubscribeUpdateTransactionInfo, subscribe_update::UpdateOneof,
19        },
20        solana::storage::confirmed_block::{TransactionError, TransactionStatusMeta},
21    },
22    serde::{Deserialize, Serialize},
23    solana_account::ReadableAccount,
24    solana_clock::{Epoch, Slot},
25    solana_pubkey::{PUBKEY_BYTES, Pubkey},
26    solana_signature::{SIGNATURE_BYTES, Signature},
27    solana_transaction_status::{
28        ConfirmedBlock, TransactionWithStatusMeta, VersionedTransactionWithStatusMeta,
29    },
30    std::{
31        borrow::Cow,
32        collections::HashSet,
33        ops::{Deref, Range},
34        sync::{Arc, OnceLock},
35    },
36    thiserror::Error,
37};
38
39#[derive(Debug, Error)]
40pub enum MessageParseError {
41    #[error(transparent)]
42    Prost(#[from] prost::DecodeError),
43    #[error("Field `{0}` should be defined")]
44    FieldNotDefined(&'static str),
45    #[error("Invalid enum value: {0}")]
46    InvalidEnumValue(i32),
47    #[error("Invalid pubkey length")]
48    InvalidPubkey,
49    #[error("Invalid update: {0}")]
50    InvalidUpdateMessage(&'static str),
51    #[error("Incompatible encoding")]
52    IncompatibleEncoding,
53}
54
55#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
56#[serde(rename_all = "lowercase")]
57pub enum MessageParserEncoding {
58    /// Use optimized parser to extract only required fields
59    Limited,
60    /// Parse full message with `prost`
61    Prost,
62}
63
64#[derive(Debug, Clone, Copy)]
65pub enum MessageRef<'a> {
66    Slot(&'a MessageSlot),
67    Account(&'a MessageAccount),
68    Transaction(&'a MessageTransaction),
69    Entry(&'a MessageEntry),
70    BlockMeta(&'a MessageBlockMeta),
71    Block(&'a MessageBlock),
72}
73
74impl<'a> From<&'a Message> for MessageRef<'a> {
75    fn from(message: &'a Message) -> Self {
76        match message {
77            Message::Slot(msg) => Self::Slot(msg),
78            Message::Account(msg) => Self::Account(msg),
79            Message::Transaction(msg) => Self::Transaction(msg),
80            Message::Entry(msg) => Self::Entry(msg),
81            Message::BlockMeta(msg) => Self::BlockMeta(msg),
82            Message::Block(msg) => Self::Block(msg),
83        }
84    }
85}
86
87#[allow(clippy::large_enum_variant)]
88#[derive(Debug, Clone)]
89pub enum Message {
90    Slot(MessageSlot),
91    Account(MessageAccount),
92    Transaction(MessageTransaction),
93    Entry(MessageEntry),
94    BlockMeta(MessageBlockMeta),
95    Block(MessageBlock),
96}
97
98impl Message {
99    pub fn parse(
100        data: Cow<'_, [u8]>,
101        parser: MessageParserEncoding,
102    ) -> Result<Self, MessageParseError> {
103        match parser {
104            MessageParserEncoding::Limited => MessageParserLimited::parse(data),
105            MessageParserEncoding::Prost => MessageParserProst::parse(data),
106        }
107    }
108
109    pub fn create_block(
110        accounts: Vec<Arc<MessageAccount>>,
111        transactions: Vec<Arc<MessageTransaction>>,
112        entries: Vec<Arc<MessageEntry>>,
113        block_meta: Arc<MessageBlockMeta>,
114        created_at: impl Into<MessageBlockCreatedAt>,
115    ) -> Result<Self, MessageParseError> {
116        let created_at = created_at.into();
117        let created_at_encoding = created_at.encoding();
118
119        for encoding in std::iter::once(block_meta.encoding())
120            .chain(accounts.iter().map(|x| x.encoding()))
121            .chain(transactions.iter().map(|x| x.encoding()))
122            .chain(entries.iter().map(|x| x.encoding()))
123        {
124            if encoding != created_at_encoding {
125                return Err(MessageParseError::IncompatibleEncoding);
126            }
127        }
128
129        Ok(Self::Block(Self::unchecked_create_block(
130            accounts,
131            transactions,
132            entries,
133            block_meta,
134            created_at,
135        )))
136    }
137
138    pub const fn unchecked_create_block(
139        accounts: Vec<Arc<MessageAccount>>,
140        transactions: Vec<Arc<MessageTransaction>>,
141        entries: Vec<Arc<MessageEntry>>,
142        block_meta: Arc<MessageBlockMeta>,
143        created_at: MessageBlockCreatedAt,
144    ) -> MessageBlock {
145        MessageBlock {
146            accounts,
147            transactions,
148            entries,
149            block_meta,
150            created_at,
151        }
152    }
153
154    pub fn slot(&self) -> Slot {
155        match self {
156            Self::Slot(msg) => msg.slot(),
157            Self::Account(msg) => msg.slot(),
158            Self::Transaction(msg) => msg.slot(),
159            Self::Entry(msg) => msg.slot(),
160            Self::BlockMeta(msg) => msg.slot(),
161            Self::Block(msg) => msg.slot(),
162        }
163    }
164
165    pub const fn created_at(&self) -> MessageBlockCreatedAt {
166        match self {
167            Self::Slot(msg) => msg.created_at(),
168            Self::Account(msg) => msg.created_at(),
169            Self::Transaction(msg) => msg.created_at(),
170            Self::Entry(msg) => msg.created_at(),
171            Self::BlockMeta(msg) => msg.created_at(),
172            Self::Block(msg) => msg.created_at(),
173        }
174    }
175
176    pub fn size(&self) -> usize {
177        match self {
178            Self::Slot(msg) => msg.size(),
179            Self::Account(msg) => msg.size(),
180            Self::Transaction(msg) => msg.size(),
181            Self::Entry(msg) => msg.size(),
182            Self::BlockMeta(msg) => msg.size(),
183            Self::Block(msg) => msg.size(),
184        }
185    }
186}
187
188#[derive(Debug)]
189pub struct MessageParserLimited;
190
191impl MessageParserLimited {
192    pub fn parse(data: Cow<'_, [u8]>) -> Result<Message, MessageParseError> {
193        let data = data.into_owned();
194        let update = SubscribeUpdateLimitedDecode::decode(data.as_slice())?;
195        let created_at = update
196            .created_at
197            .ok_or(MessageParseError::FieldNotDefined("created_at"))?;
198
199        Ok(
200            match update
201                .update_oneof
202                .ok_or(MessageParseError::FieldNotDefined("update_oneof"))?
203            {
204                UpdateOneofLimitedDecode::Slot(range) => {
205                    let message = UpdateOneofLimitedDecodeSlot::decode(
206                        &data.as_slice()[range.start..range.end],
207                    )?;
208                    Message::Slot(MessageSlot::Limited {
209                        slot: message.slot,
210                        parent: message.parent,
211                        status: SlotStatus::try_from(message.status)
212                            .map_err(|_| MessageParseError::InvalidEnumValue(message.status))?,
213                        dead_error: message.dead_error,
214                        created_at,
215                        buffer: data,
216                        range,
217                    })
218                }
219                UpdateOneofLimitedDecode::Account(range) => {
220                    let message = UpdateOneofLimitedDecodeAccount::decode(
221                        &data.as_slice()[range.start..range.end],
222                    )?;
223
224                    if message.account == usize::MAX {
225                        return Err(MessageParseError::FieldNotDefined("account"));
226                    }
227
228                    let mut data_range = message.data;
229                    data_range.start += range.start;
230                    data_range.end += range.start;
231
232                    Message::Account(MessageAccount::Limited {
233                        pubkey: message.pubkey,
234                        owner: message.owner,
235                        lamports: message.lamports,
236                        executable: message.executable,
237                        rent_epoch: message.rent_epoch,
238                        data: data_range,
239                        txn_signature_offset: message
240                            .txn_signature_offset
241                            .map(|offset| offset + range.start),
242                        write_version: message.write_version + range.start,
243                        slot: message.slot,
244                        is_startup: message.is_startup,
245                        created_at,
246                        buffer: data,
247                        account_offset: message.account + range.start,
248                        range,
249                    })
250                }
251                UpdateOneofLimitedDecode::Transaction(range) => {
252                    let message = UpdateOneofLimitedDecodeTransaction::decode(
253                        &data.as_slice()[range.start..range.end],
254                    )?;
255                    let mut transaction_range = message
256                        .transaction
257                        .ok_or(MessageParseError::FieldNotDefined("transaction"))?;
258                    transaction_range.start += range.start;
259                    transaction_range.end += range.start;
260
261                    let tx_info = UpdateOneofLimitedDecodeTransactionInfo::decode(
262                        &data.as_slice()[transaction_range.start..transaction_range.end],
263                    )?;
264
265                    let error = tx_info
266                        .err
267                        .map(|err_range| {
268                            TransactionError::decode(
269                                &data.as_slice()[transaction_range.start + err_range.start
270                                    ..transaction_range.start + err_range.end],
271                            )
272                        })
273                        .transpose()?;
274
275                    Message::Transaction(MessageTransaction::Limited {
276                        signature_offset: tx_info
277                            .signature_offset
278                            .map(|offset| transaction_range.start + offset),
279                        error,
280                        account_keys: tx_info.account_keys,
281                        is_vote: tx_info.is_vote,
282                        index: tx_info.index,
283                        transaction_range,
284                        transaction: OnceLock::new(),
285                        slot: message.slot,
286                        created_at,
287                        buffer: data,
288                        range,
289                    })
290                }
291                UpdateOneofLimitedDecode::TransactionStatus(_) => {
292                    return Err(MessageParseError::InvalidUpdateMessage("TransactionStatus"));
293                }
294                UpdateOneofLimitedDecode::Entry(range) => {
295                    let entry = UpdateOneofLimitedDecodeEntry::decode(
296                        &data.as_slice()[range.start..range.end],
297                    )?;
298                    Message::Entry(MessageEntry::Limited {
299                        slot: entry.slot,
300                        index: entry.index,
301                        executed_transaction_count: entry.executed_transaction_count,
302                        created_at,
303                        buffer: data,
304                        range,
305                    })
306                }
307                UpdateOneofLimitedDecode::BlockMeta(range) => {
308                    let block_meta =
309                        SubscribeUpdateBlockMeta::decode(&data.as_slice()[range.start..range.end])?;
310
311                    let block_height = block_meta
312                        .block_height
313                        .map(|v| v.block_height)
314                        .ok_or(MessageParseError::FieldNotDefined("block_height"))?;
315
316                    Message::BlockMeta(MessageBlockMeta::Limited {
317                        block_meta,
318                        block_height,
319                        created_at,
320                        buffer: data,
321                        range,
322                    })
323                }
324                UpdateOneofLimitedDecode::Block(_) => {
325                    return Err(MessageParseError::InvalidUpdateMessage("Block"));
326                }
327                UpdateOneofLimitedDecode::Ping(_) => {
328                    return Err(MessageParseError::InvalidUpdateMessage("Ping"));
329                }
330                UpdateOneofLimitedDecode::Pong(_) => {
331                    return Err(MessageParseError::InvalidUpdateMessage("Pong"));
332                }
333            },
334        )
335    }
336}
337
338#[derive(Debug)]
339pub struct MessageParserProst;
340
341impl MessageParserProst {
342    pub fn parse(data: Cow<'_, [u8]>) -> Result<Message, MessageParseError> {
343        let update = SubscribeUpdate::decode(data.deref())?;
344        let encoded_len = data.len();
345
346        let created_at = update
347            .created_at
348            .ok_or(MessageParseError::FieldNotDefined("created_at"))?;
349
350        Ok(
351            match update
352                .update_oneof
353                .ok_or(MessageParseError::FieldNotDefined("update_oneof"))?
354            {
355                UpdateOneof::Slot(message) => Message::Slot(MessageSlot::Prost {
356                    slot: message.slot,
357                    parent: message.parent,
358                    status: SlotStatus::try_from(message.status)
359                        .map_err(|_| MessageParseError::InvalidEnumValue(message.status))?,
360                    dead_error: message.dead_error,
361                    created_at,
362                    size: encoded_len,
363                }),
364                UpdateOneof::Account(message) => {
365                    let account = message
366                        .account
367                        .ok_or(MessageParseError::FieldNotDefined("account"))?;
368                    Message::Account(MessageAccount::Prost {
369                        pubkey: account
370                            .pubkey
371                            .as_slice()
372                            .try_into()
373                            .map_err(|_| MessageParseError::InvalidPubkey)?,
374                        owner: account
375                            .owner
376                            .as_slice()
377                            .try_into()
378                            .map_err(|_| MessageParseError::InvalidPubkey)?,
379                        account,
380                        slot: message.slot,
381                        is_startup: message.is_startup,
382                        created_at,
383                        size: PUBKEY_BYTES + PUBKEY_BYTES + encoded_len + 20,
384                    })
385                }
386                UpdateOneof::Transaction(message) => {
387                    let transaction = message
388                        .transaction
389                        .ok_or(MessageParseError::FieldNotDefined("transaction"))?;
390                    let meta = transaction
391                        .meta
392                        .as_ref()
393                        .ok_or(MessageParseError::FieldNotDefined("meta"))?;
394
395                    let account_keys =
396                        MessageTransaction::gen_account_keys_prost(&transaction, meta)?;
397                    let account_keys_capacity = account_keys.capacity();
398
399                    Message::Transaction(MessageTransaction::Prost {
400                        error: meta.err.clone(),
401                        account_keys,
402                        transaction,
403                        slot: message.slot,
404                        created_at,
405                        size: encoded_len + SIGNATURE_BYTES + account_keys_capacity * PUBKEY_BYTES,
406                    })
407                }
408                UpdateOneof::TransactionStatus(_) => {
409                    return Err(MessageParseError::InvalidUpdateMessage("TransactionStatus"));
410                }
411                UpdateOneof::Entry(entry) => Message::Entry(MessageEntry::Prost {
412                    entry,
413                    created_at,
414                    size: encoded_len,
415                }),
416                UpdateOneof::BlockMeta(block_meta) => {
417                    let block_height = block_meta
418                        .block_height
419                        .map(|v| v.block_height)
420                        .ok_or(MessageParseError::FieldNotDefined("block_height"))?;
421                    Message::BlockMeta(MessageBlockMeta::Prost {
422                        block_meta,
423                        block_height,
424                        created_at,
425                        size: encoded_len,
426                    })
427                }
428                UpdateOneof::Block(message) => {
429                    let accounts = message
430                        .accounts
431                        .into_iter()
432                        .map(|account| {
433                            let encoded_len = account.encoded_len();
434                            Ok(Arc::new(MessageAccount::Prost {
435                                pubkey: account
436                                    .pubkey
437                                    .as_slice()
438                                    .try_into()
439                                    .map_err(|_| MessageParseError::InvalidPubkey)?,
440                                owner: account
441                                    .owner
442                                    .as_slice()
443                                    .try_into()
444                                    .map_err(|_| MessageParseError::InvalidPubkey)?,
445                                account,
446                                slot: message.slot,
447                                is_startup: false,
448                                created_at,
449                                size: PUBKEY_BYTES + PUBKEY_BYTES + encoded_len + 32,
450                            }))
451                        })
452                        .collect::<Result<_, MessageParseError>>()?;
453
454                    let transactions = message
455                        .transactions
456                        .into_iter()
457                        .map(|transaction| {
458                            let meta = transaction
459                                .meta
460                                .as_ref()
461                                .ok_or(MessageParseError::FieldNotDefined("meta"))?;
462
463                            let account_keys =
464                                MessageTransaction::gen_account_keys_prost(&transaction, meta)?;
465                            let account_keys_capacity = account_keys.capacity();
466
467                            Ok(Arc::new(MessageTransaction::Prost {
468                                error: meta.err.clone(),
469                                account_keys,
470                                transaction,
471                                slot: message.slot,
472                                created_at,
473                                size: encoded_len
474                                    + SIGNATURE_BYTES
475                                    + account_keys_capacity * PUBKEY_BYTES,
476                            }))
477                        })
478                        .collect::<Result<_, MessageParseError>>()?;
479
480                    let entries = message
481                        .entries
482                        .into_iter()
483                        .map(|entry| {
484                            let encoded_len = entry.encoded_len();
485                            Arc::new(MessageEntry::Prost {
486                                entry,
487                                created_at,
488                                size: encoded_len,
489                            })
490                        })
491                        .collect();
492
493                    let block_meta = SubscribeUpdateBlockMeta {
494                        slot: message.slot,
495                        blockhash: message.blockhash,
496                        rewards: message.rewards,
497                        block_time: message.block_time,
498                        block_height: message.block_height,
499                        parent_slot: message.parent_slot,
500                        parent_blockhash: message.parent_blockhash,
501                        executed_transaction_count: message.executed_transaction_count,
502                        entries_count: message.entries_count,
503                    };
504                    let encoded_len = block_meta.encoded_len();
505                    let block_height = block_meta
506                        .block_height
507                        .map(|v| v.block_height)
508                        .ok_or(MessageParseError::FieldNotDefined("block_height"))?;
509
510                    Message::Block(MessageBlock {
511                        accounts,
512                        transactions,
513                        entries,
514                        block_meta: Arc::new(MessageBlockMeta::Prost {
515                            block_meta,
516                            block_height,
517                            created_at,
518                            size: encoded_len,
519                        }),
520                        created_at: MessageBlockCreatedAt::Prost(created_at),
521                    })
522                }
523                UpdateOneof::Ping(_) => {
524                    return Err(MessageParseError::InvalidUpdateMessage("Ping"));
525                }
526                UpdateOneof::Pong(_) => {
527                    return Err(MessageParseError::InvalidUpdateMessage("Pong"));
528                }
529            },
530        )
531    }
532}
533
534#[derive(Debug, Clone)]
535pub enum MessageSlot {
536    Limited {
537        slot: Slot,
538        parent: Option<Slot>,
539        status: SlotStatus,
540        dead_error: Option<Range<usize>>,
541        created_at: Timestamp,
542        buffer: Vec<u8>,
543        range: Range<usize>,
544    },
545    Prost {
546        slot: Slot,
547        parent: Option<Slot>,
548        status: SlotStatus,
549        dead_error: Option<String>,
550        created_at: Timestamp,
551        size: usize,
552    },
553}
554
555impl MessageSlot {
556    pub const fn encoding(&self) -> MessageParserEncoding {
557        match self {
558            Self::Limited { .. } => MessageParserEncoding::Limited,
559            Self::Prost { .. } => MessageParserEncoding::Prost,
560        }
561    }
562
563    pub const fn created_at(&self) -> MessageBlockCreatedAt {
564        match self {
565            Self::Limited { created_at, .. } => MessageBlockCreatedAt::Limited(*created_at),
566            Self::Prost { created_at, .. } => MessageBlockCreatedAt::Prost(*created_at),
567        }
568    }
569
570    pub const fn slot(&self) -> Slot {
571        match self {
572            Self::Limited { slot, .. } => *slot,
573            Self::Prost { slot, .. } => *slot,
574        }
575    }
576
577    pub fn size(&self) -> usize {
578        match self {
579            Self::Limited { buffer, .. } => buffer.len() + 64,
580            Self::Prost { size, .. } => *size,
581        }
582    }
583
584    pub const fn status(&self) -> SlotStatus {
585        match self {
586            Self::Limited { status, .. } => *status,
587            Self::Prost { status, .. } => *status,
588        }
589    }
590
591    pub const fn parent(&self) -> Option<Slot> {
592        match self {
593            Self::Limited { parent, .. } => *parent,
594            Self::Prost { parent, .. } => *parent,
595        }
596    }
597
598    pub fn dead_error(&self) -> Option<&str> {
599        match self {
600            Self::Limited {
601                dead_error, buffer, ..
602            } => dead_error.as_ref().map(|range| unsafe {
603                std::str::from_utf8_unchecked(&buffer.as_slice()[range.start..range.end])
604            }),
605            Self::Prost { dead_error, .. } => dead_error.as_deref(),
606        }
607    }
608}
609
610#[allow(clippy::large_enum_variant)]
611#[derive(Debug, Clone)]
612#[cfg_attr(test, derive(PartialEq))]
613pub enum MessageAccount {
614    Limited {
615        pubkey: Pubkey,
616        owner: Pubkey,
617        lamports: u64,
618        executable: bool,
619        rent_epoch: Epoch,
620        data: Range<usize>,
621        txn_signature_offset: Option<usize>,
622        write_version: usize,
623        slot: Slot,
624        is_startup: bool,
625        created_at: Timestamp,
626        buffer: Vec<u8>,
627        account_offset: usize,
628        range: Range<usize>,
629    },
630    Prost {
631        pubkey: Pubkey,
632        owner: Pubkey,
633        account: SubscribeUpdateAccountInfo,
634        slot: Slot,
635        is_startup: bool,
636        created_at: Timestamp,
637        size: usize,
638    },
639}
640
641impl MessageAccount {
642    pub const fn encoding(&self) -> MessageParserEncoding {
643        match self {
644            Self::Limited { .. } => MessageParserEncoding::Limited,
645            Self::Prost { .. } => MessageParserEncoding::Prost,
646        }
647    }
648
649    pub const fn slot(&self) -> Slot {
650        match self {
651            Self::Limited { slot, .. } => *slot,
652            Self::Prost { slot, .. } => *slot,
653        }
654    }
655
656    pub const fn created_at(&self) -> MessageBlockCreatedAt {
657        match self {
658            Self::Limited { created_at, .. } => MessageBlockCreatedAt::Limited(*created_at),
659            Self::Prost { created_at, .. } => MessageBlockCreatedAt::Prost(*created_at),
660        }
661    }
662
663    pub fn size(&self) -> usize {
664        match self {
665            Self::Limited { buffer, .. } => buffer.len() + PUBKEY_BYTES * 2 + 86,
666            Self::Prost { size, .. } => *size,
667        }
668    }
669
670    pub const fn pubkey(&self) -> &Pubkey {
671        match self {
672            Self::Limited { pubkey, .. } => pubkey,
673            Self::Prost { pubkey, .. } => pubkey,
674        }
675    }
676
677    pub fn write_version(&self) -> u64 {
678        match self {
679            Self::Limited {
680                write_version,
681                buffer,
682                ..
683            } => {
684                let mut buffer = &buffer.as_slice()[*write_version..];
685                decode_varint(&mut buffer).expect("already verified")
686            }
687            Self::Prost { account, .. } => account.write_version,
688        }
689    }
690
691    pub fn txn_signature(&self) -> Option<&[u8]> {
692        match self {
693            MessageAccount::Limited {
694                txn_signature_offset,
695                buffer,
696                ..
697            } => txn_signature_offset.map(|offset| &buffer.as_slice()[offset..offset + 64]),
698            MessageAccount::Prost { account, .. } => account.txn_signature.as_deref(),
699        }
700    }
701
702    pub const fn nonempty_txn_signature(&self) -> bool {
703        match self {
704            Self::Limited {
705                txn_signature_offset,
706                ..
707            } => txn_signature_offset.is_some(),
708            Self::Prost { account, .. } => account.txn_signature.is_some(),
709        }
710    }
711
712    pub fn update_write_version(&mut self, write_version: u64) {
713        match self {
714            Self::Limited {
715                buffer,
716                range,
717                account_offset,
718                write_version: write_version_current,
719                data,
720                txn_signature_offset,
721                ..
722            } => {
723                // calculate current and new len of write_version
724                let mut buf = &mut &buffer.as_slice()[*write_version_current..];
725                let start = buf.remaining();
726                decode_varint(&mut buf).expect("already verified");
727                let wv_size_current = start - buf.remaining();
728                let wv_size_new = encoded_len_varint(write_version);
729
730                // calculate current and new len of account msg
731                let mut buf = &mut &buffer.as_slice()[*account_offset..];
732                let start = buf.remaining();
733                let msg_size = decode_varint(&mut buf).expect("already verified");
734                let msg_size_current = start - buf.remaining();
735                let msg_size = msg_size + wv_size_new as u64 - wv_size_current as u64;
736                let msg_size_new = encoded_len_varint(msg_size);
737
738                // resize if required
739                let new_end =
740                    range.end + msg_size_new - msg_size_current + wv_size_new - wv_size_current;
741                if new_end > buffer.len() {
742                    buffer.resize(new_end, 0);
743                }
744
745                // copy data before write_version
746                unsafe {
747                    let end_current = *account_offset + msg_size_current;
748                    let end_new = *account_offset + msg_size_new;
749                    std::ptr::copy(
750                        buffer.as_ptr().add(end_current),
751                        buffer.as_mut_ptr().add(end_new),
752                        *write_version_current - end_current,
753                    );
754                }
755
756                // copy data after write_version
757                let write_version_new = *write_version_current + msg_size_new - msg_size_current;
758                unsafe {
759                    let end_current = *write_version_current + wv_size_current;
760                    let end_new = write_version_new + wv_size_new;
761                    std::ptr::copy(
762                        buffer.as_ptr().add(end_current),
763                        buffer.as_mut_ptr().add(end_new),
764                        range.end - end_current,
765                    );
766                }
767
768                // save new message size and write_version
769                encode_varint(msg_size, &mut &mut buffer.as_mut_slice()[*account_offset..]);
770                encode_varint(
771                    write_version,
772                    &mut &mut buffer.as_mut_slice()[write_version_new..],
773                );
774
775                // update offsets
776                range.end = new_end;
777                // update msg_size anyway
778                data.start = data.start + msg_size_new - msg_size_current;
779                data.end = data.end + msg_size_new - msg_size_current;
780                if data.start > write_version_new {
781                    data.start = data.start + wv_size_new - wv_size_current;
782                    data.end = data.end + wv_size_new - wv_size_current;
783                }
784                if let Some(txn_signature_offset) = txn_signature_offset {
785                    *txn_signature_offset = *txn_signature_offset + msg_size_new - msg_size_current;
786                    if *txn_signature_offset > write_version_new {
787                        *txn_signature_offset =
788                            *txn_signature_offset + wv_size_new - wv_size_current;
789                    }
790                }
791                *write_version_current = write_version_new;
792            }
793            Self::Prost { account, size, .. } => {
794                *size = *size + encoded_len_varint(write_version)
795                    - encoded_len_varint(account.write_version);
796                account.write_version = write_version;
797            }
798        }
799    }
800}
801
802impl ReadableAccount for MessageAccount {
803    fn lamports(&self) -> u64 {
804        match self {
805            Self::Limited { lamports, .. } => *lamports,
806            Self::Prost { account, .. } => account.lamports,
807        }
808    }
809
810    fn data(&self) -> &[u8] {
811        match self {
812            Self::Limited { data, buffer, .. } => &buffer.as_slice()[data.start..data.end],
813            Self::Prost { account, .. } => &account.data,
814        }
815    }
816
817    fn owner(&self) -> &Pubkey {
818        match self {
819            Self::Limited { owner, .. } => owner,
820            Self::Prost { owner, .. } => owner,
821        }
822    }
823
824    fn executable(&self) -> bool {
825        match self {
826            Self::Limited { executable, .. } => *executable,
827            Self::Prost { account, .. } => account.executable,
828        }
829    }
830
831    fn rent_epoch(&self) -> Epoch {
832        match self {
833            Self::Limited { rent_epoch, .. } => *rent_epoch,
834            Self::Prost { account, .. } => account.rent_epoch,
835        }
836    }
837}
838
839#[allow(clippy::large_enum_variant)]
840#[derive(Debug, Clone)]
841pub enum MessageTransaction {
842    Limited {
843        signature_offset: Option<usize>,
844        error: Option<TransactionError>,
845        account_keys: HashSet<Pubkey>,
846        is_vote: bool,
847        index: u64,
848        transaction_range: Range<usize>,
849        transaction: OnceLock<Option<SubscribeUpdateTransactionInfo>>,
850        slot: Slot,
851        created_at: Timestamp,
852        buffer: Vec<u8>,
853        range: Range<usize>,
854    },
855    Prost {
856        error: Option<TransactionError>,
857        account_keys: HashSet<Pubkey>,
858        transaction: SubscribeUpdateTransactionInfo,
859        slot: Slot,
860        created_at: Timestamp,
861        size: usize,
862    },
863}
864
865impl MessageTransaction {
866    pub const fn encoding(&self) -> MessageParserEncoding {
867        match self {
868            Self::Limited { .. } => MessageParserEncoding::Limited,
869            Self::Prost { .. } => MessageParserEncoding::Prost,
870        }
871    }
872
873    pub const fn slot(&self) -> Slot {
874        match self {
875            Self::Limited { slot, .. } => *slot,
876            Self::Prost { slot, .. } => *slot,
877        }
878    }
879
880    pub const fn created_at(&self) -> MessageBlockCreatedAt {
881        match self {
882            Self::Limited { created_at, .. } => MessageBlockCreatedAt::Limited(*created_at),
883            Self::Prost { created_at, .. } => MessageBlockCreatedAt::Prost(*created_at),
884        }
885    }
886
887    pub fn size(&self) -> usize {
888        match self {
889            Self::Limited {
890                account_keys,
891                buffer,
892                ..
893            } => buffer.len() * 2 + account_keys.capacity() * PUBKEY_BYTES,
894            Self::Prost { size, .. } => *size,
895        }
896    }
897
898    pub fn gen_account_keys_prost(
899        transaction: &SubscribeUpdateTransactionInfo,
900        meta: &TransactionStatusMeta,
901    ) -> Result<HashSet<Pubkey>, MessageParseError> {
902        let mut account_keys = HashSet::new();
903
904        // static account keys
905        if let Some(pubkeys) = transaction
906            .transaction
907            .as_ref()
908            .ok_or(MessageParseError::FieldNotDefined("transaction"))?
909            .message
910            .as_ref()
911            .map(|msg| msg.account_keys.as_slice())
912        {
913            for pubkey in pubkeys {
914                account_keys.insert(
915                    Pubkey::try_from(pubkey.as_slice())
916                        .map_err(|_| MessageParseError::InvalidPubkey)?,
917                );
918            }
919        }
920        // dynamic account keys
921        for pubkey in meta.loaded_writable_addresses.iter() {
922            account_keys.insert(
923                Pubkey::try_from(pubkey.as_slice())
924                    .map_err(|_| MessageParseError::InvalidPubkey)?,
925            );
926        }
927        for pubkey in meta.loaded_readonly_addresses.iter() {
928            account_keys.insert(
929                Pubkey::try_from(pubkey.as_slice())
930                    .map_err(|_| MessageParseError::InvalidPubkey)?,
931            );
932        }
933
934        Ok(account_keys)
935    }
936
937    pub fn signature(&self) -> Signature {
938        Signature::from(
939            <[u8; SIGNATURE_BYTES]>::try_from(self.signature_ref())
940                .expect("signature must be 64 bytes"),
941        )
942    }
943
944    pub fn signature_ref(&self) -> &[u8] {
945        match self {
946            Self::Limited {
947                signature_offset,
948                buffer,
949                ..
950            } => match signature_offset {
951                Some(offset) => &buffer[*offset..*offset + SIGNATURE_BYTES],
952                None => panic!("transaction should have valid signature"),
953            },
954            Self::Prost { transaction, .. } => &transaction.signature,
955        }
956    }
957
958    pub const fn vote(&self) -> bool {
959        match self {
960            Self::Limited { is_vote, .. } => *is_vote,
961            Self::Prost { transaction, .. } => transaction.is_vote,
962        }
963    }
964
965    pub const fn index(&self) -> u64 {
966        match self {
967            Self::Limited { index, .. } => *index,
968            Self::Prost { transaction, .. } => transaction.index,
969        }
970    }
971
972    pub const fn failed(&self) -> bool {
973        match self {
974            Self::Limited { error, .. } => error.is_some(),
975            Self::Prost { error, .. } => error.is_some(),
976        }
977    }
978
979    pub const fn error(&self) -> &Option<TransactionError> {
980        match self {
981            Self::Limited { error, .. } => error,
982            Self::Prost { error, .. } => error,
983        }
984    }
985
986    fn transaction(&self) -> Result<&SubscribeUpdateTransactionInfo, &'static str> {
987        match self {
988            Self::Limited {
989                transaction,
990                transaction_range,
991                buffer,
992                ..
993            } => transaction
994                .get_or_init(|| {
995                    SubscribeUpdateTransactionInfo::decode(
996                        &buffer.as_slice()[transaction_range.clone()],
997                    )
998                    .ok()
999                })
1000                .as_ref()
1001                .ok_or("FailedToDecode"),
1002            Self::Prost { transaction, .. } => Ok(transaction),
1003        }
1004    }
1005
1006    pub fn transaction_meta(&self) -> Result<&TransactionStatusMeta, &'static str> {
1007        self.transaction()
1008            .and_then(|tx| tx.meta.as_ref().ok_or("FieldNotDefined"))
1009    }
1010
1011    pub fn as_versioned_transaction_with_status_meta(
1012        &self,
1013    ) -> Result<VersionedTransactionWithStatusMeta, &'static str> {
1014        Ok(VersionedTransactionWithStatusMeta {
1015            transaction: convert_from::create_tx_versioned(
1016                self.transaction()?
1017                    .transaction
1018                    .clone()
1019                    .ok_or("FieldNotDefined")?,
1020            )?,
1021            meta: convert_from::create_tx_meta(self.transaction_meta()?.clone())?,
1022        })
1023    }
1024
1025    pub const fn account_keys(&self) -> &HashSet<Pubkey> {
1026        match self {
1027            Self::Limited { account_keys, .. } => account_keys,
1028            Self::Prost { account_keys, .. } => account_keys,
1029        }
1030    }
1031}
1032
1033#[derive(Debug, Clone)]
1034pub enum MessageEntry {
1035    Limited {
1036        slot: Slot,
1037        index: u64,
1038        executed_transaction_count: u64,
1039        created_at: Timestamp,
1040        buffer: Vec<u8>,
1041        range: Range<usize>,
1042    },
1043    Prost {
1044        entry: SubscribeUpdateEntry,
1045        created_at: Timestamp,
1046        size: usize,
1047    },
1048}
1049
1050impl MessageEntry {
1051    pub const fn encoding(&self) -> MessageParserEncoding {
1052        match self {
1053            Self::Limited { .. } => MessageParserEncoding::Limited,
1054            Self::Prost { .. } => MessageParserEncoding::Prost,
1055        }
1056    }
1057
1058    pub const fn slot(&self) -> Slot {
1059        match self {
1060            Self::Limited { slot, .. } => *slot,
1061            Self::Prost { entry, .. } => entry.slot,
1062        }
1063    }
1064
1065    pub const fn created_at(&self) -> MessageBlockCreatedAt {
1066        match self {
1067            Self::Limited { created_at, .. } => MessageBlockCreatedAt::Limited(*created_at),
1068            Self::Prost { created_at, .. } => MessageBlockCreatedAt::Prost(*created_at),
1069        }
1070    }
1071
1072    pub fn size(&self) -> usize {
1073        match self {
1074            Self::Limited { buffer, .. } => buffer.len() + 52,
1075            Self::Prost { size, .. } => *size,
1076        }
1077    }
1078
1079    pub const fn index(&self) -> u64 {
1080        match self {
1081            Self::Limited { index, .. } => *index,
1082            Self::Prost { entry, .. } => entry.index,
1083        }
1084    }
1085
1086    pub const fn executed_transaction_count(&self) -> u64 {
1087        match self {
1088            Self::Limited {
1089                executed_transaction_count,
1090                ..
1091            } => *executed_transaction_count,
1092            Self::Prost { entry, .. } => entry.executed_transaction_count,
1093        }
1094    }
1095}
1096
1097#[derive(Debug, Clone)]
1098pub enum MessageBlockMeta {
1099    Limited {
1100        block_meta: SubscribeUpdateBlockMeta,
1101        block_height: Slot,
1102        created_at: Timestamp,
1103        buffer: Vec<u8>,
1104        range: Range<usize>,
1105    },
1106    Prost {
1107        block_meta: SubscribeUpdateBlockMeta,
1108        block_height: Slot,
1109        created_at: Timestamp,
1110        size: usize,
1111    },
1112}
1113
1114impl MessageBlockMeta {
1115    pub const fn encoding(&self) -> MessageParserEncoding {
1116        match self {
1117            Self::Limited { .. } => MessageParserEncoding::Limited,
1118            Self::Prost { .. } => MessageParserEncoding::Prost,
1119        }
1120    }
1121
1122    pub const fn slot(&self) -> Slot {
1123        match self {
1124            Self::Limited { block_meta, .. } => block_meta.slot,
1125            Self::Prost { block_meta, .. } => block_meta.slot,
1126        }
1127    }
1128
1129    pub const fn created_at(&self) -> MessageBlockCreatedAt {
1130        match self {
1131            Self::Limited { created_at, .. } => MessageBlockCreatedAt::Limited(*created_at),
1132            Self::Prost { created_at, .. } => MessageBlockCreatedAt::Prost(*created_at),
1133        }
1134    }
1135
1136    pub fn size(&self) -> usize {
1137        match self {
1138            Self::Limited { buffer, .. } => buffer.len() * 2,
1139            Self::Prost { size, .. } => *size,
1140        }
1141    }
1142
1143    // clippy bug?
1144    #[allow(clippy::missing_const_for_fn)]
1145    pub fn blockhash(&self) -> &str {
1146        match self {
1147            Self::Limited { block_meta, .. } => &block_meta.blockhash,
1148            Self::Prost { block_meta, .. } => &block_meta.blockhash,
1149        }
1150    }
1151
1152    pub const fn block_height(&self) -> Slot {
1153        match self {
1154            Self::Limited { block_height, .. } => *block_height,
1155            Self::Prost { block_height, .. } => *block_height,
1156        }
1157    }
1158
1159    pub const fn executed_transaction_count(&self) -> u64 {
1160        match self {
1161            Self::Limited { block_meta, .. } => block_meta.executed_transaction_count,
1162            Self::Prost { block_meta, .. } => block_meta.executed_transaction_count,
1163        }
1164    }
1165
1166    pub const fn entries_count(&self) -> u64 {
1167        match self {
1168            Self::Limited { block_meta, .. } => block_meta.entries_count,
1169            Self::Prost { block_meta, .. } => block_meta.entries_count,
1170        }
1171    }
1172}
1173
1174#[derive(Debug, Clone)]
1175pub struct MessageBlock {
1176    pub accounts: Vec<Arc<MessageAccount>>,
1177    pub transactions: Vec<Arc<MessageTransaction>>,
1178    pub entries: Vec<Arc<MessageEntry>>,
1179    pub block_meta: Arc<MessageBlockMeta>,
1180    pub created_at: MessageBlockCreatedAt,
1181}
1182
1183impl MessageBlock {
1184    pub const fn encoding(&self) -> MessageParserEncoding {
1185        self.created_at.encoding()
1186    }
1187
1188    pub fn slot(&self) -> Slot {
1189        self.block_meta.as_ref().slot()
1190    }
1191
1192    pub const fn created_at(&self) -> MessageBlockCreatedAt {
1193        self.created_at
1194    }
1195
1196    pub fn size(&self) -> usize {
1197        self.accounts
1198            .iter()
1199            .map(|m| m.size())
1200            .chain(self.transactions.iter().map(|m| m.size()))
1201            .chain(self.entries.iter().map(|m| m.size()))
1202            .sum::<usize>()
1203            + self.block_meta.size()
1204    }
1205
1206    pub fn as_confirmed_block(&self) -> Result<ConfirmedBlock, &'static str> {
1207        Ok(match self.block_meta.as_ref() {
1208            MessageBlockMeta::Limited { block_meta, .. } => ConfirmedBlock {
1209                previous_blockhash: block_meta.parent_blockhash.clone(),
1210                blockhash: block_meta.blockhash.clone(),
1211                parent_slot: block_meta.parent_slot,
1212                transactions: self
1213                    .transactions
1214                    .iter()
1215                    .map(|tx| {
1216                        tx.as_versioned_transaction_with_status_meta()
1217                            .map(TransactionWithStatusMeta::Complete)
1218                    })
1219                    .collect::<Result<Vec<_>, _>>()?,
1220                rewards: block_meta
1221                    .rewards
1222                    .as_ref()
1223                    .map(|r| {
1224                        r.rewards
1225                            .iter()
1226                            .cloned()
1227                            .map(convert_from::create_reward)
1228                            .collect::<Result<Vec<_>, _>>()
1229                    })
1230                    .transpose()?
1231                    .unwrap_or_default(),
1232                num_partitions: block_meta
1233                    .rewards
1234                    .as_ref()
1235                    .and_then(|r| r.num_partitions)
1236                    .map(|np| np.num_partitions),
1237                block_time: block_meta.block_time.map(|bt| bt.timestamp),
1238                block_height: block_meta.block_height.map(|bh| bh.block_height),
1239            },
1240            MessageBlockMeta::Prost { block_meta, .. } => ConfirmedBlock {
1241                previous_blockhash: block_meta.parent_blockhash.clone(),
1242                blockhash: block_meta.blockhash.clone(),
1243                parent_slot: block_meta.parent_slot,
1244                transactions: self
1245                    .transactions
1246                    .iter()
1247                    .map(|tx| {
1248                        tx.as_versioned_transaction_with_status_meta()
1249                            .map(TransactionWithStatusMeta::Complete)
1250                    })
1251                    .collect::<Result<Vec<_>, _>>()?,
1252                rewards: block_meta
1253                    .rewards
1254                    .as_ref()
1255                    .map(|r| {
1256                        r.rewards
1257                            .iter()
1258                            .cloned()
1259                            .map(convert_from::create_reward)
1260                            .collect::<Result<Vec<_>, _>>()
1261                    })
1262                    .transpose()?
1263                    .unwrap_or_default(),
1264                num_partitions: block_meta
1265                    .rewards
1266                    .as_ref()
1267                    .and_then(|r| r.num_partitions)
1268                    .map(|np| np.num_partitions),
1269                block_time: block_meta.block_time.map(|bt| bt.timestamp),
1270                block_height: block_meta.block_height.map(|bh| bh.block_height),
1271            },
1272        })
1273    }
1274}
1275
1276#[derive(Debug, Clone, Copy, PartialEq, Eq)]
1277pub enum MessageBlockCreatedAt {
1278    Limited(Timestamp),
1279    Prost(Timestamp),
1280}
1281
1282impl From<MessageBlockCreatedAt> for Timestamp {
1283    fn from(value: MessageBlockCreatedAt) -> Self {
1284        match value {
1285            MessageBlockCreatedAt::Limited(timestamp) => timestamp,
1286            MessageBlockCreatedAt::Prost(timestamp) => timestamp,
1287        }
1288    }
1289}
1290
1291impl MessageBlockCreatedAt {
1292    pub const fn encoding(&self) -> MessageParserEncoding {
1293        match self {
1294            Self::Limited(_) => MessageParserEncoding::Limited,
1295            Self::Prost(_) => MessageParserEncoding::Prost,
1296        }
1297    }
1298
1299    pub const fn as_millis(&self) -> u64 {
1300        match self {
1301            Self::Limited(ts) => ts.seconds as u64 * 1_000 + (ts.nanos / 1_000_000) as u64,
1302            Self::Prost(ts) => ts.seconds as u64 * 1_000 + (ts.nanos / 1_000_000) as u64,
1303        }
1304    }
1305}
1306
1307#[cfg(test)]
1308mod tests {
1309    use {
1310        super::{Message, MessageAccount, MessageParserEncoding, MessageRef},
1311        crate::{
1312            config::{ConfigFilter, ConfigFilterAccounts, ConfigFilterAccountsDataSlice},
1313            filter::Filter,
1314        },
1315        maplit::hashmap,
1316        solana_account::ReadableAccount,
1317        solana_commitment_config::CommitmentLevel,
1318    };
1319
1320    static MESSAGE: &str = "0a0012af010aa6010a2088f1ffa3a2dfe617bdc4e3573251a322e3fcae81e5a457390e64751c00a465e210e0d54a1a2006aa09548b50476ad462f91f89a3015033264fc9abd5270020a9d142334742fb28ffffffffffffffffff013208c921f474e044612838e3e1acc2b53042405bd620fab28d3c0b78b3ead9f04d1c4d6dffeac4ffa7c679a6570b0226557c10b4c4016d937e06044b4e49d9d7916524d5dfa26297c5f638c3d11f846410bc0510e5ddaca2015a0c08e1c79ec10610ebef838601";
1321
1322    fn parse(data: Vec<u8>, parser: MessageParserEncoding) -> MessageAccount {
1323        if let Message::Account(msg) = Message::parse(data.into(), parser).expect("valid message") {
1324            assert!(
1325                match parser {
1326                    MessageParserEncoding::Prost => matches!(msg, MessageAccount::Prost { .. }),
1327                    MessageParserEncoding::Limited => matches!(msg, MessageAccount::Limited { .. }),
1328                },
1329                "unexpected msg encoding"
1330            );
1331
1332            msg
1333        } else {
1334            panic!("expected account message");
1335        }
1336    }
1337
1338    fn encode(msg: &MessageAccount, data_slice: Option<(u64, u64)>) -> Vec<u8> {
1339        let filter = Filter::new(&ConfigFilter {
1340            accounts: hashmap! { "".to_owned() => ConfigFilterAccounts::default() },
1341            accounts_data_slice: data_slice
1342                .map(|(offset, length)| vec![ConfigFilterAccountsDataSlice { offset, length }])
1343                .unwrap_or_default(),
1344            ..Default::default()
1345        });
1346
1347        let message = Message::Account(msg.clone());
1348        let message_ref: MessageRef = (&message).into();
1349
1350        let updates = filter.get_updates_ref(message_ref, CommitmentLevel::Processed);
1351        assert_eq!(updates.len(), 1, "unexpected number of updates");
1352        updates[0].encode_to_vec()
1353    }
1354
1355    #[test]
1356    fn test_limited() {
1357        let mut msg = parse(
1358            const_hex::decode(MESSAGE).expect("valid hex"),
1359            MessageParserEncoding::Limited,
1360        );
1361        assert_eq!(msg.write_version(), 1663633666275, "valid write version");
1362
1363        msg.update_write_version(1);
1364        assert_eq!(msg.write_version(), 1, "dec valid write version");
1365        let mut msg2 = parse(encode(&msg, None), MessageParserEncoding::Limited);
1366        if let (
1367            MessageAccount::Limited { buffer, .. },
1368            MessageAccount::Limited {
1369                buffer: buffer2, ..
1370            },
1371        ) = (&msg, &mut msg2)
1372        {
1373            *buffer2 = buffer.clone(); // ignore buffer
1374        }
1375        assert_eq!(msg, msg2, "write version update failed");
1376        // check with data slice
1377        let mut msg2 = parse(encode(&msg, Some((1, 3))), MessageParserEncoding::Limited);
1378        assert_eq!(msg.write_version(), msg2.write_version());
1379        assert_eq!(&msg.data()[1..4], msg2.data());
1380        if let (
1381            MessageAccount::Limited {
1382                buffer,
1383                txn_signature_offset,
1384                write_version,
1385                data,
1386                range,
1387                ..
1388            },
1389            MessageAccount::Limited {
1390                buffer: buffer2,
1391                txn_signature_offset: txn_signature_offset2,
1392                write_version: write_version2,
1393                data: data2,
1394                range: range2,
1395                ..
1396            },
1397        ) = (&msg, &mut msg2)
1398        {
1399            let txn_offset = txn_signature_offset.unwrap();
1400            let txn_offset2 = txn_signature_offset2.unwrap();
1401            assert_eq!(
1402                &buffer[txn_offset..txn_offset + 64],
1403                &buffer2[txn_offset2..txn_offset2 + 64]
1404            );
1405            *buffer2 = buffer.clone(); // ignore buffer
1406            *txn_signature_offset2 = *txn_signature_offset;
1407            *write_version2 = *write_version;
1408            *data2 = data.clone();
1409            *range2 = range.clone();
1410        }
1411        assert_eq!(msg, msg2, "write version update failed");
1412
1413        msg.update_write_version(u64::MAX);
1414        assert_eq!(msg.write_version(), u64::MAX, "inc valid write version");
1415        let mut msg2 = parse(encode(&msg, None), MessageParserEncoding::Limited);
1416        if let (
1417            MessageAccount::Limited { buffer, .. },
1418            MessageAccount::Limited {
1419                buffer: buffer2, ..
1420            },
1421        ) = (&msg, &mut msg2)
1422        {
1423            *buffer2 = buffer.clone(); // ignore buffer
1424        }
1425        assert_eq!(msg, msg2, "write version update failed");
1426        // check with data slice
1427        let mut msg2 = parse(encode(&msg, Some((1, 3))), MessageParserEncoding::Limited);
1428        assert_eq!(msg.write_version(), msg2.write_version());
1429        assert_eq!(&msg.data()[1..4], msg2.data());
1430        if let (
1431            MessageAccount::Limited {
1432                buffer,
1433                txn_signature_offset,
1434                write_version,
1435                data,
1436                range,
1437                ..
1438            },
1439            MessageAccount::Limited {
1440                buffer: buffer2,
1441                txn_signature_offset: txn_signature_offset2,
1442                write_version: write_version2,
1443                data: data2,
1444                range: range2,
1445                ..
1446            },
1447        ) = (&msg, &mut msg2)
1448        {
1449            let txn_offset = txn_signature_offset.unwrap();
1450            let txn_offset2 = txn_signature_offset2.unwrap();
1451            assert_eq!(
1452                &buffer[txn_offset..txn_offset + 64],
1453                &buffer2[txn_offset2..txn_offset2 + 64]
1454            );
1455            *buffer2 = buffer.clone(); // ignore buffer
1456            *txn_signature_offset2 = *txn_signature_offset;
1457            *write_version2 = *write_version;
1458            *data2 = data.clone();
1459            *range2 = range.clone();
1460        }
1461        assert_eq!(msg, msg2, "write version update failed");
1462    }
1463
1464    #[test]
1465    fn test_prost() {
1466        let mut msg = parse(
1467            const_hex::decode(MESSAGE).expect("valid hex"),
1468            MessageParserEncoding::Prost,
1469        );
1470        assert_eq!(msg.write_version(), 1663633666275, "valid write version");
1471
1472        msg.update_write_version(1);
1473        assert_eq!(msg.write_version(), 1, "dec valid write version");
1474        let msg2 = parse(encode(&msg, None), MessageParserEncoding::Prost);
1475        assert_eq!(msg, msg2, "write version update failed");
1476
1477        msg.update_write_version(u64::MAX);
1478        assert_eq!(msg.write_version(), u64::MAX, "inc valid write version");
1479        let msg2 = parse(encode(&msg, None), MessageParserEncoding::Prost);
1480        assert_eq!(msg, msg2, "write version update failed");
1481    }
1482}