epics_ca/channel/
base.rs

1use super::{get::Callback, subscribe::Queue, Get, Put, Subscription};
2use crate::{
3    context::Context,
4    error::{self, result_from_raw, Error},
5    request::WriteRequest,
6    types::FieldId,
7    utils::Ptr,
8};
9use futures::{future::FusedFuture, task::AtomicWaker};
10use std::{
11    ffi::{c_void, CStr},
12    future::Future,
13    pin::Pin,
14    ptr::{self, NonNull},
15    sync::{
16        atomic::{AtomicBool, Ordering},
17        Mutex,
18    },
19    task::{Context as Cx, Poll},
20};
21
22/// Basic channel.
23///
24/// Channel is an entity that has a name and could be read, written or subscribed to.
25#[derive(Debug)]
26pub struct Channel {
27    ctx: Context,
28    raw: <sys::chanId as Ptr>::NonNull,
29}
30
31unsafe impl Send for Channel where Context: Send {}
32
33impl Channel {
34    /// Create channel without waiting for connection.
35    pub fn new(ctx: &Context, name: &CStr) -> Result<Self, Error> {
36        ctx.clone().with(|| {
37            let mut raw: sys::chanId = ptr::null_mut();
38            let puser = Box::leak(Box::new(UserData::new())) as *mut UserData;
39            const DEFAULT_PRIORITY: u32 = 0;
40
41            match result_from_raw(unsafe {
42                sys::ca_create_channel(
43                    name.as_ptr(),
44                    Some(Self::connect_callback),
45                    puser as *mut c_void,
46                    DEFAULT_PRIORITY,
47                    &mut raw as *mut _,
48                )
49            }) {
50                Ok(()) => {
51                    ctx.flush_io();
52                    Ok(Channel {
53                        ctx: ctx.clone(),
54                        raw: NonNull::new(raw).unwrap(),
55                    })
56                }
57                Err(e) => {
58                    unsafe { Box::from_raw(puser) };
59                    Err(e)
60                }
61            }
62        })
63    }
64    /// Wait for channel become connected.
65    pub fn connected(&mut self) -> Connect<'_> {
66        Connect::new(self)
67    }
68    /// Context of the channel.
69    pub fn context(&self) -> &Context {
70        &self.ctx
71    }
72    /// Raw channed identifier.
73    pub fn raw(&self) -> sys::chanId {
74        self.raw.as_ptr()
75    }
76    pub(crate) fn user_data(&self) -> &UserData {
77        unsafe { &*(sys::ca_puser(self.raw.as_ptr()) as *const UserData) }
78    }
79    /// Channel name.
80    pub fn name(&self) -> &CStr {
81        unsafe { CStr::from_ptr(sys::ca_name(self.raw())) }
82    }
83    /// Channel field type.
84    pub fn field_type(&self) -> Result<FieldId, Error> {
85        let raw = unsafe { sys::ca_field_type(self.raw()) } as i32;
86        if raw == sys::TYPENOTCONN {
87            return Err(error::DISCONN);
88        }
89        FieldId::try_from_raw(raw).ok_or(error::BADTYPE)
90    }
91    /// Number of elements in the channel.
92    pub fn element_count(&self) -> Result<usize, Error> {
93        let count = unsafe { sys::ca_element_count(self.raw()) } as usize;
94        if count == 0 {
95            return Err(error::DISCONN);
96        }
97        Ok(count)
98    }
99    /// Name of the host which serves the channel.
100    pub fn host_name(&self) -> Result<&CStr, Error> {
101        const DISCONN_HOST: &CStr =
102            unsafe { CStr::from_bytes_with_nul_unchecked(b"<disconnected>\0") };
103
104        let str = unsafe { CStr::from_ptr(sys::ca_host_name(self.raw())) };
105        if str != DISCONN_HOST {
106            Ok(str)
107        } else {
108            Err(error::DISCONN)
109        }
110    }
111}
112
113impl Drop for Channel {
114    fn drop(&mut self) {
115        self.context().with(|| {
116            let puser = self.user_data() as *const _ as *mut UserData;
117            result_from_raw(unsafe { sys::ca_clear_channel(self.raw()) }).unwrap();
118            unsafe { Box::from_raw(puser) };
119        });
120    }
121}
122
123pub(crate) struct UserData {
124    pub(crate) waker: AtomicWaker,
125    pub(crate) connected: AtomicBool,
126    pub(crate) process: Mutex<ProcessData>,
127}
128
129impl UserData {
130    fn new() -> Self {
131        Self {
132            connected: AtomicBool::new(false),
133            waker: AtomicWaker::new(),
134            process: Mutex::new(ProcessData::new()),
135        }
136    }
137}
138
139pub(crate) struct ProcessData {
140    id_counter: usize,
141    pub(crate) data: *mut u8,
142    pub(crate) put_res: Option<Result<(), Error>>,
143}
144
145impl ProcessData {
146    pub fn new() -> Self {
147        Self {
148            id_counter: 0,
149            data: ptr::null_mut(),
150            put_res: None,
151        }
152    }
153    pub fn id(&self) -> usize {
154        self.id_counter
155    }
156    pub fn change_id(&mut self) {
157        self.id_counter += 1;
158    }
159}
160
161/// Future to wait for connection.
162#[must_use]
163pub struct Connect<'a> {
164    channel: Option<&'a mut Channel>,
165}
166
167impl<'a> Connect<'a> {
168    fn new(channel: &'a mut Channel) -> Self {
169        Connect {
170            channel: Some(channel),
171        }
172    }
173}
174
175impl<'a> Future for Connect<'a> {
176    type Output = ();
177    fn poll(mut self: Pin<&mut Self>, cx: &mut Cx<'_>) -> Poll<Self::Output> {
178        let channel = self.channel.take().unwrap();
179        channel.user_data().waker.register(cx.waker());
180        if channel.user_data().connected.load(Ordering::Acquire) {
181            Poll::Ready(())
182        } else {
183            self.channel.replace(channel);
184            Poll::Pending
185        }
186    }
187}
188
189impl<'a> FusedFuture for Connect<'a> {
190    fn is_terminated(&self) -> bool {
191        self.channel.is_none()
192    }
193}
194
195impl Channel {
196    unsafe extern "C" fn connect_callback(args: sys::connection_handler_args) {
197        let user_data = &*(sys::ca_puser(args.chid) as *const UserData);
198        user_data.connected.store(
199            match args.op as _ {
200                sys::CA_OP_CONN_UP => true,
201                sys::CA_OP_CONN_DOWN => false,
202                _ => unreachable!(),
203            },
204            Ordering::Release,
205        );
206        user_data.waker.wake();
207    }
208}
209
210impl Channel {
211    /// Make write request by reference.
212    pub fn put_ref<R: WriteRequest + ?Sized>(&mut self, req: &R) -> Result<Put<'_>, Error> {
213        Put::new(self, req)
214    }
215    /// Make read request and call closure when it's done, successfully or not.
216    pub fn get_with<F: Callback>(&mut self, func: F) -> Get<'_, F> {
217        Get::new(self, func)
218    }
219    /// Subscribe to channel updates and call closure each time when update occured.
220    pub fn subscribe_with<F: Queue>(&mut self, func: F) -> Subscription<'_, F> {
221        Subscription::new(self, func)
222    }
223}
224
225#[cfg(test)]
226mod tests {
227    use crate::{context::UniqueContext, Channel, Context};
228    use async_std::{task::sleep, test as async_test};
229    use cstr::cstr;
230    use futures::{select, FutureExt};
231    use serial_test::serial;
232    use std::{ptr, time::Duration};
233
234    #[async_test]
235    #[serial]
236    async fn connect() {
237        let ctx = Context::new().unwrap();
238        Channel::new(&ctx, cstr!("ca:test:ai"))
239            .unwrap()
240            .connected()
241            .await;
242    }
243
244    #[async_test]
245    async fn connect_nonexistent() {
246        let mut chan = Channel::new(&Context::new().unwrap(), cstr!("__nonexistent__")).unwrap();
247        select! {
248            _ = chan.connected() => panic!(),
249            _ = sleep(Duration::from_millis(100)).fuse() => (),
250        }
251    }
252
253    #[async_test]
254    #[serial]
255    async fn user_data() {
256        let ctx = Context::new().unwrap();
257        let mut channel = Channel::new(&ctx, cstr!("ca:test:ai")).unwrap();
258        channel.connected().await;
259
260        // Test that user data can be accessed without context attachment.
261        assert!(UniqueContext::current().is_null());
262        let user_data = channel.user_data();
263        ctx.with(|| {
264            assert!(ptr::eq(channel.user_data(), user_data));
265        });
266    }
267}