1use std::ffi::CString;
18use std::sync::atomic::{AtomicBool, Ordering};
19use std::sync::{Arc, Mutex};
20
21use crate::client_conductor::ClientConductor;
22use crate::concurrent::atomic_buffer::AtomicBuffer;
23use crate::concurrent::atomic_counter::AtomicCounter;
24use crate::utils::errors::AeronError;
25
26pub struct Counter {
27 atomic_counter: AtomicCounter,
29 client_conductor: Arc<Mutex<ClientConductor>>,
30 registration_id: i64,
31 is_closed: AtomicBool,
32}
33
34impl Counter {
35 pub fn new(
36 client_conductor: Arc<Mutex<ClientConductor>>,
37 buffer: AtomicBuffer,
38 registration_id: i64,
39 counter_id: i32,
40 ) -> Self {
41 Self {
42 atomic_counter: AtomicCounter::new(buffer, counter_id),
43 client_conductor,
44 registration_id,
45 is_closed: AtomicBool::from(false),
46 }
47 }
48
49 pub fn registration_id(&self) -> i64 {
50 self.registration_id
51 }
52
53 pub fn is_closed(&self) -> bool {
54 self.is_closed.load(Ordering::SeqCst)
55 }
56
57 pub fn close(&self) {
58 self.is_closed.store(true, Ordering::SeqCst);
59 }
60
61 pub fn state(&self) -> Result<i32, AeronError> {
62 let cc = self.client_conductor.lock().expect("Mutex poisoned");
63 let cr = cc.counters_reader()?;
64 cr.counter_state(self.atomic_counter.id())
65 }
66
67 pub fn label(&self) -> Result<CString, AeronError> {
68 let cc = self.client_conductor.lock().expect("Mutex poisoned");
69 let cr = cc.counters_reader()?;
70 cr.counter_label(self.atomic_counter.id())
71 }
72
73 pub fn id(&self) -> i32 {
75 self.atomic_counter.id()
76 }
77}
78
79impl Drop for Counter {
80 fn drop(&mut self) {
81 let _ignored = self
82 .client_conductor
83 .lock()
84 .expect("Mutex poisoned")
85 .release_counter(self.registration_id);
86 }
87}