aeron_rs/command/
subscription_message_flyweight.rs

1/*
2 * Copyright 2020 UT OVERSEAS INC
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * https://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16#[cfg(test)]
17use std::ffi::CString;
18
19use crate::command::correlated_message_flyweight::{CorrelatedMessageDefn, CorrelatedMessageFlyweight};
20use crate::concurrent::atomic_buffer::AtomicBuffer;
21use crate::utils::types::Index;
22
23/**
24 * Control message for adding a subscription.
25 *
26 * <p>
27 * 0                   1                   2                   3
28 * 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
29 * +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
30 * |                         Client ID                             |
31 * |                                                               |
32 * +---------------------------------------------------------------+
33 * |                       Correlation ID                          |
34 * |                                                               |
35 * +---------------------------------------------------------------+
36 * |                 Registration Correlation ID                   |
37 * |                                                               |
38 * +---------------------------------------------------------------+
39 * |                         Stream Id                             |
40 * +---------------------------------------------------------------+
41 * |                      Channel Length                           |
42 * +---------------------------------------------------------------+
43 * |                          Channel                             ...
44 * ...                                                             |
45 * +---------------------------------------------------------------+
46 */
47#[repr(C, packed(4))]
48#[derive(Copy, Clone)]
49pub(crate) struct SubscriptionMessageDefn {
50    correlated_message: CorrelatedMessageDefn,
51    registration_correlation_id: i64,
52    stream_id: i32,
53    channel_length: i32,
54    channel_data: [i8; 1],
55}
56
57pub(crate) struct SubscriptionMessageFlyweight {
58    correlated_message_flyweight: CorrelatedMessageFlyweight,
59    m_struct: *mut SubscriptionMessageDefn,
60}
61
62impl SubscriptionMessageFlyweight {
63    pub fn new(buffer: AtomicBuffer, offset: Index) -> Self {
64        let correlated_message_flyweight = CorrelatedMessageFlyweight::new(buffer, offset);
65        let m_struct = correlated_message_flyweight
66            .flyweight
67            .overlay_struct::<SubscriptionMessageDefn>(0);
68        Self {
69            correlated_message_flyweight,
70            m_struct,
71        }
72    }
73
74    #[cfg(test)]
75    pub fn stream_id(&self) -> i32 {
76        unsafe { (*self.m_struct).stream_id }
77    }
78
79    #[inline]
80    pub fn set_registration_correlation_id(&mut self, value: i64) {
81        unsafe {
82            (*self.m_struct).registration_correlation_id = value;
83        }
84    }
85
86    #[inline]
87    pub fn set_stream_id(&mut self, value: i32) {
88        unsafe {
89            (*self.m_struct).stream_id = value;
90        }
91    }
92
93    #[cfg(test)]
94    pub fn channel(&self) -> CString {
95        self.correlated_message_flyweight
96            .flyweight
97            .string_get(offset_of!(SubscriptionMessageDefn, channel_length) as Index)
98    }
99
100    #[inline]
101    pub fn set_channel(&mut self, value: &[u8]) {
102        self.correlated_message_flyweight
103            .flyweight
104            .string_put(offset_of!(SubscriptionMessageDefn, channel_length) as Index, value);
105    }
106
107    #[inline]
108    pub fn length(&self) -> Index {
109        unsafe { offset_of!(SubscriptionMessageDefn, channel_data) as Index + (*self.m_struct).channel_length as Index }
110    }
111
112    // Parent Getters
113    #[cfg(test)]
114    pub fn correlation_id(&self) -> i64 {
115        self.correlated_message_flyweight.correlation_id()
116    }
117
118    // Parent Setters
119
120    #[inline]
121    pub fn set_client_id(&mut self, value: i64) {
122        self.correlated_message_flyweight.set_client_id(value);
123    }
124
125    #[inline]
126    pub fn set_correlation_id(&mut self, value: i64) {
127        self.correlated_message_flyweight.set_correlation_id(value);
128    }
129}