epics_ca/channel/
subscribe.rs

1use super::{base::UserData, Channel};
2use crate::{
3    error::{result_from_raw, Error},
4    request::{ReadRequest, Request},
5    types::{EventMask, RequestId},
6};
7use futures::Stream;
8use pin_project::{pin_project, pinned_drop};
9use std::{
10    cell::UnsafeCell,
11    collections::VecDeque,
12    marker::{PhantomData, PhantomPinned},
13    pin::Pin,
14    ptr,
15    task::{Context, Poll},
16};
17
18/// Subscription queue.
19pub trait Queue: Send {
20    type Request: ReadRequest + ?Sized;
21    type Output: Send + Sized;
22
23    /// Called immediately on channel updates.
24    fn push(&mut self, input: Result<&Self::Request, Error>);
25    /// Called when user tries to extract data from [`Subscription`] stream.
26    fn pop(&mut self) -> Option<Result<Self::Output, Error>>;
27}
28
29/// Subscription to channel.
30///
31/// Stores queue that called each time channel is updated.
32/// Provides results extracted from queue.
33///
34/// Depending on the type of the queue stored subscription may provide
35/// either the last unread value or all received values.
36#[must_use]
37#[pin_project(PinnedDrop)]
38pub struct Subscription<'a, F: Queue> {
39    owner: &'a mut Channel,
40    /// Must be locked by `owner.user_data().process` mutex
41    state: UnsafeCell<F>,
42    mask: EventMask,
43    evid: Option<sys::evid>,
44    #[pin]
45    _pp: PhantomPinned,
46}
47
48unsafe impl<'a, F: Queue> Send for Subscription<'a, F> {}
49
50impl<'a, F: Queue> Subscription<'a, F> {
51    pub(crate) fn new(owner: &'a mut Channel, func: F) -> Self {
52        Self {
53            owner,
54            state: UnsafeCell::new(func),
55            mask: EventMask::VALUE | EventMask::ALARM,
56            evid: None,
57            _pp: PhantomPinned,
58        }
59    }
60
61    /// Set kinds of channel events this subscription should be notified.
62    ///
63    /// Default event mask is [`EventMask::VALUE`]` | `[`EventMask::ALARM`].
64    ///
65    /// *You need to call this before [`start`](`Self::start`)-ing the subscription
66    /// because after it started you cannot unpin it.*
67    pub fn set_event_mask(&mut self, mask: EventMask) {
68        self.mask = mask;
69    }
70
71    /// Initiate subscription.
72    ///
73    /// **You will not receive channel update until this method was called, explicitly or implicitly.**
74    ///
75    /// This method can be called implicitly on the first poll.
76    /// It cannot be done in constructor because `Self` must be pinned at this point.
77    pub fn start(self: Pin<&mut Self>) -> Result<(), Error> {
78        assert!(self.evid.is_none());
79        let this = self.project();
80        let owner = this.owner;
81        owner.context().with(|| {
82            let mut proc = owner.user_data().process.lock().unwrap();
83            proc.data = this.state.get() as *mut u8;
84            let mut evid: sys::evid = ptr::null_mut();
85            result_from_raw(unsafe {
86                sys::ca_create_subscription(
87                    F::Request::ID.raw() as _,
88                    0,
89                    owner.raw(),
90                    this.mask.raw() as _,
91                    Some(Self::callback),
92                    proc.id() as _,
93                    &mut evid as *mut sys::evid,
94                )
95            })
96            .map(|()| {
97                owner.context().flush_io();
98                *this.evid = Some(evid);
99            })
100        })
101    }
102
103    unsafe extern "C" fn callback(args: sys::event_handler_args) {
104        let user_data = &*(sys::ca_puser(args.chid) as *const UserData);
105        let proc = user_data.process.lock().unwrap();
106        if proc.id() != args.usr as usize {
107            return;
108        }
109        let func = &mut *(proc.data as *mut F);
110        func.push(result_from_raw(args.status).and_then(|()| {
111            F::Request::from_ptr(
112                args.dbr as *const u8,
113                RequestId::try_from_raw(args.type_ as _).unwrap(),
114                args.count as usize,
115            )
116        }));
117        drop(proc);
118        user_data.waker.wake();
119    }
120}
121
122impl<'a, F: Queue> Stream for Subscription<'a, F> {
123    type Item = Result<F::Output, Error>;
124
125    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
126        self.owner.user_data().waker.register(cx.waker());
127        if self.evid.is_none() {
128            self.start()?;
129            return Poll::Pending;
130        }
131        let this = self.project();
132        let proc = this.owner.user_data().process.lock().unwrap();
133        let func = unsafe { &mut *this.state.get() };
134        let poll = match func.pop() {
135            Some(res) => Poll::Ready(Some(res)),
136            None => Poll::Pending,
137        };
138        drop(proc);
139        poll
140    }
141}
142
143#[pinned_drop]
144impl<'a, F: Queue> PinnedDrop for Subscription<'a, F> {
145    #[allow(clippy::needless_lifetimes)]
146    fn drop(self: Pin<&mut Self>) {
147        let mut proc = self.owner.user_data().process.lock().unwrap();
148        proc.change_id();
149        proc.data = ptr::null_mut();
150        if let Some(evid) = self.evid {
151            self.owner.context().with(|| unsafe {
152                result_from_raw(sys::ca_clear_subscription(evid)).unwrap();
153            });
154        }
155        drop(proc);
156    }
157}
158
159/// Subscription queue that stores only last received value, applying `F` to it.
160///
161/// `F` applied to all received values. If `F` returned `None` the new value will not overwrite previous value.
162pub struct LastFn<I, O, F = fn(Result<&I, Error>) -> Option<Result<O, Error>>>
163where
164    I: ReadRequest + ?Sized,
165    O: Send,
166    F: FnMut(Result<&I, Error>) -> Option<Result<O, Error>> + Send,
167{
168    func: F,
169    last: Option<Result<O, Error>>,
170    _p: PhantomData<I>,
171}
172
173impl<I, O, F> LastFn<I, O, F>
174where
175    I: ReadRequest + ?Sized,
176    O: Send,
177    F: FnMut(Result<&I, Error>) -> Option<Result<O, Error>> + Send,
178{
179    pub(crate) fn new(f: F) -> Self {
180        Self {
181            func: f,
182            last: None,
183            _p: PhantomData,
184        }
185    }
186}
187
188impl<I, O, F> Queue for LastFn<I, O, F>
189where
190    I: ReadRequest + ?Sized,
191    O: Send,
192    F: FnMut(Result<&I, Error>) -> Option<Result<O, Error>> + Send,
193{
194    type Request = I;
195    type Output = O;
196    fn push(&mut self, input: Result<&Self::Request, Error>) {
197        if let Some(output) = (self.func)(input) {
198            self.last = Some(output);
199        }
200    }
201    fn pop(&mut self) -> Option<Result<Self::Output, Error>> {
202        self.last.take()
203    }
204}
205
206/// Subscription queue that stores all received values, applying `F` to them.
207///
208/// If `F` returned `None` the value is filtered out.
209pub struct QueueFn<I, O, F = fn(Result<&I, Error>) -> Option<Result<O, Error>>>
210where
211    I: ReadRequest + ?Sized,
212    O: Send,
213    F: FnMut(Result<&I, Error>) -> Option<Result<O, Error>> + Send,
214{
215    func: F,
216    queue: VecDeque<Result<O, Error>>,
217    _p: PhantomData<I>,
218}
219
220impl<I, O, F> QueueFn<I, O, F>
221where
222    I: ReadRequest + ?Sized,
223    O: Send,
224    F: FnMut(Result<&I, Error>) -> Option<Result<O, Error>> + Send,
225{
226    pub(crate) fn new(f: F) -> Self {
227        Self {
228            func: f,
229            queue: VecDeque::new(),
230            _p: PhantomData,
231        }
232    }
233}
234
235impl<I, O, F> Queue for QueueFn<I, O, F>
236where
237    I: ReadRequest + ?Sized,
238    O: Send,
239    F: FnMut(Result<&I, Error>) -> Option<Result<O, Error>> + Send,
240{
241    type Request = I;
242    type Output = O;
243    fn push(&mut self, input: Result<&Self::Request, Error>) {
244        if let Some(output) = (self.func)(input) {
245            self.queue.push_back(output);
246        }
247    }
248    fn pop(&mut self) -> Option<Result<Self::Output, Error>> {
249        self.queue.pop_front()
250    }
251}