epics_ca/channel/
put.rs

1use super::{base::UserData, Channel};
2use crate::{
3    error::{result_from_raw, Error},
4    request::WriteRequest,
5};
6use std::{
7    future::Future,
8    pin::Pin,
9    task::{Context, Poll},
10};
11
12/// Future that waits for write request is done, successfully or not.
13///
14/// *Waiting for this future to complete is optional.
15/// The write can be done successfully even if it dropped before completion.*
16pub struct Put<'a> {
17    owner: &'a mut Channel,
18}
19
20impl<'a> Unpin for Put<'a> {}
21
22impl<'a> Put<'a> {
23    pub fn new<R: WriteRequest + ?Sized>(
24        owner: &'a mut Channel,
25        request: &R,
26    ) -> Result<Self, Error> {
27        owner
28            .context()
29            .with(|| {
30                let mut proc = owner.user_data().process.lock().unwrap();
31                result_from_raw(unsafe {
32                    sys::ca_array_put_callback(
33                        R::ID.raw() as _,
34                        request.len() as _,
35                        owner.raw(),
36                        request as *const R as *const _,
37                        Some(Self::callback),
38                        proc.id() as _,
39                    )
40                })
41                .map(|()| {
42                    owner.context().flush_io();
43                    proc.put_res = None;
44                })
45            })
46            .map(|()| Self { owner })
47    }
48
49    unsafe extern "C" fn callback(args: sys::event_handler_args) {
50        let user_data = &*(sys::ca_puser(args.chid) as *const UserData);
51        let mut proc = user_data.process.lock().unwrap();
52        if proc.id() != args.usr as usize {
53            return;
54        }
55        proc.put_res = Some(result_from_raw(args.status));
56        user_data.waker.wake();
57    }
58}
59
60impl<'a> Future for Put<'a> {
61    type Output = Result<(), Error>;
62    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
63        let user_data = self.owner.user_data();
64        user_data.waker.register(cx.waker());
65        let mut proc = user_data.process.lock().unwrap();
66        match proc.put_res.take() {
67            Some(status) => Poll::Ready(status),
68            None => Poll::Pending,
69        }
70    }
71}
72
73impl<'a> Drop for Put<'a> {
74    fn drop(&mut self) {
75        let mut proc = self.owner.user_data().process.lock().unwrap();
76        proc.change_id();
77        proc.put_res = None;
78    }
79}