epics_ca/channel/
get.rs

1use super::{base::UserData, Channel};
2use crate::{
3    error::{result_from_raw, Error},
4    request::{ReadRequest, Request},
5    types::RequestId,
6};
7use pin_project::{pin_project, pinned_drop};
8use std::{
9    cell::UnsafeCell,
10    future::Future,
11    marker::{PhantomData, PhantomPinned},
12    mem,
13    pin::Pin,
14    ptr,
15    task::{Context, Poll},
16};
17
18/// Callback that called when request result is ready.
19pub trait Callback: Send {
20    type Request: ReadRequest + ?Sized;
21    type Output: Send;
22
23    /// Performs some operation on request result.
24    fn apply(self, input: Result<&Self::Request, Error>) -> Result<Self::Output, Error>;
25}
26
27pub(crate) enum GetState<F: Callback> {
28    Empty,
29    Pending(F),
30    Ready(Result<F::Output, Error>),
31}
32
33/// Future that performs reading from channel.
34#[must_use]
35#[pin_project(PinnedDrop)]
36pub struct Get<'a, F: Callback> {
37    owner: &'a mut Channel,
38    /// Must be locked by `owner.user_data().process` mutex
39    state: UnsafeCell<GetState<F>>,
40    started: bool,
41    #[pin]
42    _pp: PhantomPinned,
43}
44
45impl<'a, F: Callback> Get<'a, F> {
46    pub(crate) fn new(owner: &'a mut Channel, func: F) -> Self {
47        Self {
48            owner,
49            state: UnsafeCell::new(GetState::Pending(func)),
50            started: false,
51            _pp: PhantomPinned,
52        }
53    }
54
55    /// Initiate reading.
56    ///
57    /// This method can be called implicitly on the first poll.
58    /// It cannot be done in constructor because `Self` must be pinned at this point.
59    pub fn start(self: Pin<&mut Self>) -> Result<(), Error> {
60        assert!(!self.started);
61        let this = self.project();
62        let owner = this.owner;
63        owner.context().with(|| {
64            let mut proc = owner.user_data().process.lock().unwrap();
65            proc.data = this.state.get() as *mut u8;
66            result_from_raw(unsafe {
67                sys::ca_array_get_callback(
68                    F::Request::ID.raw() as _,
69                    0,
70                    owner.raw(),
71                    Some(Self::callback),
72                    proc.id() as _,
73                )
74            })
75            .map(|()| {
76                owner.context().flush_io();
77                *this.started = true
78            })
79        })
80    }
81
82    unsafe extern "C" fn callback(args: sys::event_handler_args) {
83        let user_data = &*(sys::ca_puser(args.chid) as *const UserData);
84        let proc = user_data.process.lock().unwrap();
85        if proc.id() != args.usr as usize {
86            return;
87        }
88        let state = &mut *(proc.data as *mut GetState<F>);
89        let func = match mem::replace(state, GetState::Empty) {
90            GetState::Pending(func) => func,
91            _ => unreachable!(),
92        };
93        *state = GetState::Ready(func.apply(result_from_raw(args.status).and_then(|()| {
94            F::Request::from_ptr(
95                args.dbr as *const u8,
96                RequestId::try_from_raw(args.type_ as _).unwrap(),
97                args.count as usize,
98            )
99        })));
100        user_data.waker.wake();
101    }
102}
103
104impl<'a, F: Callback> Future for Get<'a, F> {
105    type Output = Result<F::Output, Error>;
106    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
107        self.owner.user_data().waker.register(cx.waker());
108        if !self.started {
109            self.start()?;
110            return Poll::Pending;
111        }
112        let this = self.project();
113        let proc = this.owner.user_data().process.lock().unwrap();
114        let state = unsafe { &mut *this.state.get() };
115        let poll = match mem::replace(state, GetState::Empty) {
116            GetState::Empty => unreachable!(),
117            GetState::Pending(func) => {
118                *state = GetState::Pending(func);
119                Poll::Pending
120            }
121            GetState::Ready(res) => match res {
122                Ok(ret) => Poll::Ready(Ok(ret)),
123                Err(err) => Poll::Ready(Err(err)),
124            },
125        };
126        drop(proc);
127        poll
128    }
129}
130
131#[pinned_drop]
132impl<'a, F: Callback> PinnedDrop for Get<'a, F> {
133    #[allow(clippy::needless_lifetimes)]
134    fn drop(self: Pin<&mut Self>) {
135        let mut proc = self.owner.user_data().process.lock().unwrap();
136        proc.change_id();
137        proc.data = ptr::null_mut();
138    }
139}
140
141/// Adapter that allows to use arbitrary function or closure as [`Callback`].
142pub struct GetFn<R, O, F = fn(Result<&R, Error>) -> Result<O, Error>>
143where
144    R: ReadRequest + ?Sized,
145    O: Send,
146    F: FnOnce(Result<&R, Error>) -> Result<O, Error> + Send,
147{
148    func: F,
149    _p: PhantomData<(*const R, O)>,
150}
151
152impl<R, O, F> GetFn<R, O, F>
153where
154    R: ReadRequest + ?Sized,
155    O: Send,
156    F: FnOnce(Result<&R, Error>) -> Result<O, Error> + Send,
157{
158    pub(crate) fn new(f: F) -> Self {
159        Self {
160            func: f,
161            _p: PhantomData,
162        }
163    }
164}
165
166unsafe impl<R, O, F> Send for GetFn<R, O, F>
167where
168    R: ReadRequest + ?Sized,
169    O: Send,
170    F: FnOnce(Result<&R, Error>) -> Result<O, Error> + Send,
171{
172}
173
174impl<R, O, F> Callback for GetFn<R, O, F>
175where
176    R: ReadRequest + ?Sized,
177    O: Send,
178    F: FnOnce(Result<&R, Error>) -> Result<O, Error> + Send,
179{
180    type Request = R;
181    type Output = O;
182    fn apply(self, input: Result<&Self::Request, Error>) -> Result<Self::Output, Error> {
183        (self.func)(input)
184    }
185}