aeron_rs/protocol/
status_message_flyweight.rs

1/*
2 * Copyright 2020 UT OVERSEAS INC
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * https://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17use crate::concurrent::atomic_buffer::AtomicBuffer;
18use crate::protocol::header_flyweight::{HeaderDefn, HeaderFlyweight};
19use crate::utils::types::Index;
20
21pub const STATUS_MESSAGE_DEFN_SIZE: Index = std::mem::size_of::<StatusMessageDefn>() as Index;
22
23/**
24 * Flow/Congestion control message to send feedback from subscriptions to publications.
25 *
26 * <p>
27 *    0                   1                   2                   3
28 *    0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
29 *   +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
30 *   |R|                 Frame Length (=header + data)               |
31 *   +---------------+-+-------------+-------------------------------+
32 *   |   Version     |S|    Flags    |          Type (=0x03)         |
33 *   +---------------+-+-------------+-------------------------------+
34 *   |                          Session ID                           |
35 *   +---------------------------------------------------------------+
36 *   |                           Stream ID                           |
37 *   +---------------------------------------------------------------+
38 *   |                      Consumption Term ID                      |
39 *   +---------------------------------------------------------------+
40 *   |R|                  Consumption Term Offset                    |
41 *   +---------------------------------------------------------------+
42 *   |                        Receiver Window                        |
43 *   +---------------------------------------------------------------+
44 *   |                  Application Specific Feedback               ...
45 *  ...                                                              |
46 *   +---------------------------------------------------------------+
47 */
48#[repr(C, packed(4))]
49#[derive(Copy, Clone)]
50pub struct StatusMessageDefn {
51    header: HeaderDefn,
52    session_id: i32,
53    stream_id: i32,
54    consumption_term_id: i32,
55    consumption_term_offset: i32,
56    receiver_window: i32,
57}
58
59#[allow(dead_code)]
60pub struct StatusMessageFlyweight {
61    header_flyweight: HeaderFlyweight,
62    m_struct: *mut StatusMessageDefn, // This is actually part of above field memory space
63}
64
65impl StatusMessageFlyweight {
66    pub fn new(buffer: AtomicBuffer, offset: Index) -> Self {
67        let header_flyweight = HeaderFlyweight::new(buffer, offset);
68        let m_struct = header_flyweight.flyweight.overlay_struct::<StatusMessageDefn>(0);
69        Self {
70            header_flyweight,
71            m_struct,
72        }
73    }
74
75    // Getters
76    #[inline]
77    pub fn session_id(&self) -> i32 {
78        unsafe { (*self.m_struct).session_id }
79    }
80
81    #[inline]
82    pub fn stream_id(&self) -> i32 {
83        unsafe { (*self.m_struct).stream_id }
84    }
85
86    #[inline]
87    pub fn consumption_term_id(&self) -> i32 {
88        unsafe { (*self.m_struct).consumption_term_id }
89    }
90
91    #[inline]
92    pub fn consumption_term_offset(&self) -> i32 {
93        unsafe { (*self.m_struct).consumption_term_offset }
94    }
95
96    #[inline]
97    pub fn receiver_window(&self) -> i32 {
98        unsafe { (*self.m_struct).receiver_window }
99    }
100
101    // Setters
102    #[inline]
103    pub fn set_session_id(&mut self, value: i32) {
104        unsafe {
105            (*self.m_struct).session_id = value;
106        }
107    }
108
109    #[inline]
110    pub fn set_stream_id(&mut self, value: i32) {
111        unsafe {
112            (*self.m_struct).stream_id = value;
113        }
114    }
115
116    #[inline]
117    pub fn set_consumption_term_id(&mut self, value: i32) {
118        unsafe {
119            (*self.m_struct).consumption_term_id = value;
120        }
121    }
122
123    #[inline]
124    pub fn set_consumption_term_offset(&mut self, value: i32) {
125        unsafe {
126            (*self.m_struct).consumption_term_offset = value;
127        }
128    }
129
130    #[inline]
131    pub fn set_receiver_window(&mut self, value: i32) {
132        unsafe {
133            (*self.m_struct).receiver_window = value;
134        }
135    }
136
137    #[inline]
138    pub const fn header_length() -> Index {
139        STATUS_MESSAGE_DEFN_SIZE
140    }
141}