1use super::{get::Callback, subscribe::Queue, Get, Put, Subscription};
2use crate::{
3 context::Context,
4 error::{self, result_from_raw, Error},
5 request::WriteRequest,
6 types::FieldId,
7 utils::Ptr,
8};
9use futures::{future::FusedFuture, task::AtomicWaker};
10use std::{
11 ffi::{c_void, CStr},
12 future::Future,
13 pin::Pin,
14 ptr::{self, NonNull},
15 sync::{
16 atomic::{AtomicBool, Ordering},
17 Mutex,
18 },
19 task::{Context as Cx, Poll},
20};
21
22#[derive(Debug)]
26pub struct Channel {
27 ctx: Context,
28 raw: <sys::chanId as Ptr>::NonNull,
29}
30
31unsafe impl Send for Channel where Context: Send {}
32
33impl Channel {
34 pub fn new(ctx: &Context, name: &CStr) -> Result<Self, Error> {
36 ctx.clone().with(|| {
37 let mut raw: sys::chanId = ptr::null_mut();
38 let puser = Box::leak(Box::new(UserData::new())) as *mut UserData;
39 const DEFAULT_PRIORITY: u32 = 0;
40
41 match result_from_raw(unsafe {
42 sys::ca_create_channel(
43 name.as_ptr(),
44 Some(Self::connect_callback),
45 puser as *mut c_void,
46 DEFAULT_PRIORITY,
47 &mut raw as *mut _,
48 )
49 }) {
50 Ok(()) => {
51 ctx.flush_io();
52 Ok(Channel {
53 ctx: ctx.clone(),
54 raw: NonNull::new(raw).unwrap(),
55 })
56 }
57 Err(e) => {
58 unsafe { Box::from_raw(puser) };
59 Err(e)
60 }
61 }
62 })
63 }
64 pub fn connected(&mut self) -> Connect<'_> {
66 Connect::new(self)
67 }
68 pub fn context(&self) -> &Context {
70 &self.ctx
71 }
72 pub fn raw(&self) -> sys::chanId {
74 self.raw.as_ptr()
75 }
76 pub(crate) fn user_data(&self) -> &UserData {
77 unsafe { &*(sys::ca_puser(self.raw.as_ptr()) as *const UserData) }
78 }
79 pub fn name(&self) -> &CStr {
81 unsafe { CStr::from_ptr(sys::ca_name(self.raw())) }
82 }
83 pub fn field_type(&self) -> Result<FieldId, Error> {
85 let raw = unsafe { sys::ca_field_type(self.raw()) } as i32;
86 if raw == sys::TYPENOTCONN {
87 return Err(error::DISCONN);
88 }
89 FieldId::try_from_raw(raw).ok_or(error::BADTYPE)
90 }
91 pub fn element_count(&self) -> Result<usize, Error> {
93 let count = unsafe { sys::ca_element_count(self.raw()) } as usize;
94 if count == 0 {
95 return Err(error::DISCONN);
96 }
97 Ok(count)
98 }
99 pub fn host_name(&self) -> Result<&CStr, Error> {
101 const DISCONN_HOST: &CStr =
102 unsafe { CStr::from_bytes_with_nul_unchecked(b"<disconnected>\0") };
103
104 let str = unsafe { CStr::from_ptr(sys::ca_host_name(self.raw())) };
105 if str != DISCONN_HOST {
106 Ok(str)
107 } else {
108 Err(error::DISCONN)
109 }
110 }
111}
112
113impl Drop for Channel {
114 fn drop(&mut self) {
115 self.context().with(|| {
116 let puser = self.user_data() as *const _ as *mut UserData;
117 result_from_raw(unsafe { sys::ca_clear_channel(self.raw()) }).unwrap();
118 unsafe { Box::from_raw(puser) };
119 });
120 }
121}
122
123pub(crate) struct UserData {
124 pub(crate) waker: AtomicWaker,
125 pub(crate) connected: AtomicBool,
126 pub(crate) process: Mutex<ProcessData>,
127}
128
129impl UserData {
130 fn new() -> Self {
131 Self {
132 connected: AtomicBool::new(false),
133 waker: AtomicWaker::new(),
134 process: Mutex::new(ProcessData::new()),
135 }
136 }
137}
138
139pub(crate) struct ProcessData {
140 id_counter: usize,
141 pub(crate) data: *mut u8,
142 pub(crate) put_res: Option<Result<(), Error>>,
143}
144
145impl ProcessData {
146 pub fn new() -> Self {
147 Self {
148 id_counter: 0,
149 data: ptr::null_mut(),
150 put_res: None,
151 }
152 }
153 pub fn id(&self) -> usize {
154 self.id_counter
155 }
156 pub fn change_id(&mut self) {
157 self.id_counter += 1;
158 }
159}
160
161#[must_use]
163pub struct Connect<'a> {
164 channel: Option<&'a mut Channel>,
165}
166
167impl<'a> Connect<'a> {
168 fn new(channel: &'a mut Channel) -> Self {
169 Connect {
170 channel: Some(channel),
171 }
172 }
173}
174
175impl<'a> Future for Connect<'a> {
176 type Output = ();
177 fn poll(mut self: Pin<&mut Self>, cx: &mut Cx<'_>) -> Poll<Self::Output> {
178 let channel = self.channel.take().unwrap();
179 channel.user_data().waker.register(cx.waker());
180 if channel.user_data().connected.load(Ordering::Acquire) {
181 Poll::Ready(())
182 } else {
183 self.channel.replace(channel);
184 Poll::Pending
185 }
186 }
187}
188
189impl<'a> FusedFuture for Connect<'a> {
190 fn is_terminated(&self) -> bool {
191 self.channel.is_none()
192 }
193}
194
195impl Channel {
196 unsafe extern "C" fn connect_callback(args: sys::connection_handler_args) {
197 let user_data = &*(sys::ca_puser(args.chid) as *const UserData);
198 user_data.connected.store(
199 match args.op as _ {
200 sys::CA_OP_CONN_UP => true,
201 sys::CA_OP_CONN_DOWN => false,
202 _ => unreachable!(),
203 },
204 Ordering::Release,
205 );
206 user_data.waker.wake();
207 }
208}
209
210impl Channel {
211 pub fn put_ref<R: WriteRequest + ?Sized>(&mut self, req: &R) -> Result<Put<'_>, Error> {
213 Put::new(self, req)
214 }
215 pub fn get_with<F: Callback>(&mut self, func: F) -> Get<'_, F> {
217 Get::new(self, func)
218 }
219 pub fn subscribe_with<F: Queue>(&mut self, func: F) -> Subscription<'_, F> {
221 Subscription::new(self, func)
222 }
223}
224
225#[cfg(test)]
226mod tests {
227 use crate::{context::UniqueContext, Channel, Context};
228 use async_std::{task::sleep, test as async_test};
229 use cstr::cstr;
230 use futures::{select, FutureExt};
231 use serial_test::serial;
232 use std::{ptr, time::Duration};
233
234 #[async_test]
235 #[serial]
236 async fn connect() {
237 let ctx = Context::new().unwrap();
238 Channel::new(&ctx, cstr!("ca:test:ai"))
239 .unwrap()
240 .connected()
241 .await;
242 }
243
244 #[async_test]
245 async fn connect_nonexistent() {
246 let mut chan = Channel::new(&Context::new().unwrap(), cstr!("__nonexistent__")).unwrap();
247 select! {
248 _ = chan.connected() => panic!(),
249 _ = sleep(Duration::from_millis(100)).fuse() => (),
250 }
251 }
252
253 #[async_test]
254 #[serial]
255 async fn user_data() {
256 let ctx = Context::new().unwrap();
257 let mut channel = Channel::new(&ctx, cstr!("ca:test:ai")).unwrap();
258 channel.connected().await;
259
260 assert!(UniqueContext::current().is_null());
262 let user_data = channel.user_data();
263 ctx.with(|| {
264 assert!(ptr::eq(channel.user_data(), user_data));
265 });
266 }
267}