aeron_rs/
counter.rs

1/*
2 * Copyright 2020 UT OVERSEAS INC
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * https://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17use std::ffi::CString;
18use std::sync::atomic::{AtomicBool, Ordering};
19use std::sync::{Arc, Mutex};
20
21use crate::client_conductor::ClientConductor;
22use crate::concurrent::atomic_buffer::AtomicBuffer;
23use crate::concurrent::atomic_counter::AtomicCounter;
24use crate::utils::errors::AeronError;
25
26pub struct Counter {
27    // inherits from AtomicCounter
28    atomic_counter: AtomicCounter,
29    client_conductor: Arc<Mutex<ClientConductor>>,
30    registration_id: i64,
31    is_closed: AtomicBool,
32}
33
34impl Counter {
35    pub fn new(
36        client_conductor: Arc<Mutex<ClientConductor>>,
37        buffer: AtomicBuffer,
38        registration_id: i64,
39        counter_id: i32,
40    ) -> Self {
41        Self {
42            atomic_counter: AtomicCounter::new(buffer, counter_id),
43            client_conductor,
44            registration_id,
45            is_closed: AtomicBool::from(false),
46        }
47    }
48
49    pub fn registration_id(&self) -> i64 {
50        self.registration_id
51    }
52
53    pub fn is_closed(&self) -> bool {
54        self.is_closed.load(Ordering::SeqCst)
55    }
56
57    pub fn close(&self) {
58        self.is_closed.store(true, Ordering::SeqCst);
59    }
60
61    pub fn state(&self) -> Result<i32, AeronError> {
62        let cc = self.client_conductor.lock().expect("Mutex poisoned");
63        let cr = cc.counters_reader()?;
64        cr.counter_state(self.atomic_counter.id())
65    }
66
67    pub fn label(&self) -> Result<CString, AeronError> {
68        let cc = self.client_conductor.lock().expect("Mutex poisoned");
69        let cr = cc.counters_reader()?;
70        cr.counter_label(self.atomic_counter.id())
71    }
72
73    /// Inherited from AtomicCounter
74    pub fn id(&self) -> i32 {
75        self.atomic_counter.id()
76    }
77}
78
79impl Drop for Counter {
80    fn drop(&mut self) {
81        let _ignored = self
82            .client_conductor
83            .lock()
84            .expect("Mutex poisoned")
85            .release_counter(self.registration_id);
86    }
87}