aeron_rs/command/subscription_message_flyweight.rs
1/*
2 * Copyright 2020 UT OVERSEAS INC
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * https://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16#[cfg(test)]
17use std::ffi::CString;
18
19use crate::command::correlated_message_flyweight::{CorrelatedMessageDefn, CorrelatedMessageFlyweight};
20use crate::concurrent::atomic_buffer::AtomicBuffer;
21use crate::utils::types::Index;
22
23/**
24 * Control message for adding a subscription.
25 *
26 * <p>
27 * 0 1 2 3
28 * 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
29 * +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
30 * | Client ID |
31 * | |
32 * +---------------------------------------------------------------+
33 * | Correlation ID |
34 * | |
35 * +---------------------------------------------------------------+
36 * | Registration Correlation ID |
37 * | |
38 * +---------------------------------------------------------------+
39 * | Stream Id |
40 * +---------------------------------------------------------------+
41 * | Channel Length |
42 * +---------------------------------------------------------------+
43 * | Channel ...
44 * ... |
45 * +---------------------------------------------------------------+
46 */
47#[repr(C, packed(4))]
48#[derive(Copy, Clone)]
49pub(crate) struct SubscriptionMessageDefn {
50 correlated_message: CorrelatedMessageDefn,
51 registration_correlation_id: i64,
52 stream_id: i32,
53 channel_length: i32,
54 channel_data: [i8; 1],
55}
56
57pub(crate) struct SubscriptionMessageFlyweight {
58 correlated_message_flyweight: CorrelatedMessageFlyweight,
59 m_struct: *mut SubscriptionMessageDefn,
60}
61
62impl SubscriptionMessageFlyweight {
63 pub fn new(buffer: AtomicBuffer, offset: Index) -> Self {
64 let correlated_message_flyweight = CorrelatedMessageFlyweight::new(buffer, offset);
65 let m_struct = correlated_message_flyweight
66 .flyweight
67 .overlay_struct::<SubscriptionMessageDefn>(0);
68 Self {
69 correlated_message_flyweight,
70 m_struct,
71 }
72 }
73
74 #[cfg(test)]
75 pub fn stream_id(&self) -> i32 {
76 unsafe { (*self.m_struct).stream_id }
77 }
78
79 #[inline]
80 pub fn set_registration_correlation_id(&mut self, value: i64) {
81 unsafe {
82 (*self.m_struct).registration_correlation_id = value;
83 }
84 }
85
86 #[inline]
87 pub fn set_stream_id(&mut self, value: i32) {
88 unsafe {
89 (*self.m_struct).stream_id = value;
90 }
91 }
92
93 #[cfg(test)]
94 pub fn channel(&self) -> CString {
95 self.correlated_message_flyweight
96 .flyweight
97 .string_get(offset_of!(SubscriptionMessageDefn, channel_length) as Index)
98 }
99
100 #[inline]
101 pub fn set_channel(&mut self, value: &[u8]) {
102 self.correlated_message_flyweight
103 .flyweight
104 .string_put(offset_of!(SubscriptionMessageDefn, channel_length) as Index, value);
105 }
106
107 #[inline]
108 pub fn length(&self) -> Index {
109 unsafe { offset_of!(SubscriptionMessageDefn, channel_data) as Index + (*self.m_struct).channel_length as Index }
110 }
111
112 // Parent Getters
113 #[cfg(test)]
114 pub fn correlation_id(&self) -> i64 {
115 self.correlated_message_flyweight.correlation_id()
116 }
117
118 // Parent Setters
119
120 #[inline]
121 pub fn set_client_id(&mut self, value: i64) {
122 self.correlated_message_flyweight.set_client_id(value);
123 }
124
125 #[inline]
126 pub fn set_correlation_id(&mut self, value: i64) {
127 self.correlated_message_flyweight.set_correlation_id(value);
128 }
129}