1use super::{base::UserData, Channel};
2use crate::{
3 error::{result_from_raw, Error},
4 request::{ReadRequest, Request},
5 types::{EventMask, RequestId},
6};
7use futures::Stream;
8use pin_project::{pin_project, pinned_drop};
9use std::{
10 cell::UnsafeCell,
11 collections::VecDeque,
12 marker::{PhantomData, PhantomPinned},
13 pin::Pin,
14 ptr,
15 task::{Context, Poll},
16};
17
18pub trait Queue: Send {
20 type Request: ReadRequest + ?Sized;
21 type Output: Send + Sized;
22
23 fn push(&mut self, input: Result<&Self::Request, Error>);
25 fn pop(&mut self) -> Option<Result<Self::Output, Error>>;
27}
28
29#[must_use]
37#[pin_project(PinnedDrop)]
38pub struct Subscription<'a, F: Queue> {
39 owner: &'a mut Channel,
40 state: UnsafeCell<F>,
42 mask: EventMask,
43 evid: Option<sys::evid>,
44 #[pin]
45 _pp: PhantomPinned,
46}
47
48unsafe impl<'a, F: Queue> Send for Subscription<'a, F> {}
49
50impl<'a, F: Queue> Subscription<'a, F> {
51 pub(crate) fn new(owner: &'a mut Channel, func: F) -> Self {
52 Self {
53 owner,
54 state: UnsafeCell::new(func),
55 mask: EventMask::VALUE | EventMask::ALARM,
56 evid: None,
57 _pp: PhantomPinned,
58 }
59 }
60
61 pub fn set_event_mask(&mut self, mask: EventMask) {
68 self.mask = mask;
69 }
70
71 pub fn start(self: Pin<&mut Self>) -> Result<(), Error> {
78 assert!(self.evid.is_none());
79 let this = self.project();
80 let owner = this.owner;
81 owner.context().with(|| {
82 let mut proc = owner.user_data().process.lock().unwrap();
83 proc.data = this.state.get() as *mut u8;
84 let mut evid: sys::evid = ptr::null_mut();
85 result_from_raw(unsafe {
86 sys::ca_create_subscription(
87 F::Request::ID.raw() as _,
88 0,
89 owner.raw(),
90 this.mask.raw() as _,
91 Some(Self::callback),
92 proc.id() as _,
93 &mut evid as *mut sys::evid,
94 )
95 })
96 .map(|()| {
97 owner.context().flush_io();
98 *this.evid = Some(evid);
99 })
100 })
101 }
102
103 unsafe extern "C" fn callback(args: sys::event_handler_args) {
104 let user_data = &*(sys::ca_puser(args.chid) as *const UserData);
105 let proc = user_data.process.lock().unwrap();
106 if proc.id() != args.usr as usize {
107 return;
108 }
109 let func = &mut *(proc.data as *mut F);
110 func.push(result_from_raw(args.status).and_then(|()| {
111 F::Request::from_ptr(
112 args.dbr as *const u8,
113 RequestId::try_from_raw(args.type_ as _).unwrap(),
114 args.count as usize,
115 )
116 }));
117 drop(proc);
118 user_data.waker.wake();
119 }
120}
121
122impl<'a, F: Queue> Stream for Subscription<'a, F> {
123 type Item = Result<F::Output, Error>;
124
125 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
126 self.owner.user_data().waker.register(cx.waker());
127 if self.evid.is_none() {
128 self.start()?;
129 return Poll::Pending;
130 }
131 let this = self.project();
132 let proc = this.owner.user_data().process.lock().unwrap();
133 let func = unsafe { &mut *this.state.get() };
134 let poll = match func.pop() {
135 Some(res) => Poll::Ready(Some(res)),
136 None => Poll::Pending,
137 };
138 drop(proc);
139 poll
140 }
141}
142
143#[pinned_drop]
144impl<'a, F: Queue> PinnedDrop for Subscription<'a, F> {
145 #[allow(clippy::needless_lifetimes)]
146 fn drop(self: Pin<&mut Self>) {
147 let mut proc = self.owner.user_data().process.lock().unwrap();
148 proc.change_id();
149 proc.data = ptr::null_mut();
150 if let Some(evid) = self.evid {
151 self.owner.context().with(|| unsafe {
152 result_from_raw(sys::ca_clear_subscription(evid)).unwrap();
153 });
154 }
155 drop(proc);
156 }
157}
158
159pub struct LastFn<I, O, F = fn(Result<&I, Error>) -> Option<Result<O, Error>>>
163where
164 I: ReadRequest + ?Sized,
165 O: Send,
166 F: FnMut(Result<&I, Error>) -> Option<Result<O, Error>> + Send,
167{
168 func: F,
169 last: Option<Result<O, Error>>,
170 _p: PhantomData<I>,
171}
172
173impl<I, O, F> LastFn<I, O, F>
174where
175 I: ReadRequest + ?Sized,
176 O: Send,
177 F: FnMut(Result<&I, Error>) -> Option<Result<O, Error>> + Send,
178{
179 pub(crate) fn new(f: F) -> Self {
180 Self {
181 func: f,
182 last: None,
183 _p: PhantomData,
184 }
185 }
186}
187
188impl<I, O, F> Queue for LastFn<I, O, F>
189where
190 I: ReadRequest + ?Sized,
191 O: Send,
192 F: FnMut(Result<&I, Error>) -> Option<Result<O, Error>> + Send,
193{
194 type Request = I;
195 type Output = O;
196 fn push(&mut self, input: Result<&Self::Request, Error>) {
197 if let Some(output) = (self.func)(input) {
198 self.last = Some(output);
199 }
200 }
201 fn pop(&mut self) -> Option<Result<Self::Output, Error>> {
202 self.last.take()
203 }
204}
205
206pub struct QueueFn<I, O, F = fn(Result<&I, Error>) -> Option<Result<O, Error>>>
210where
211 I: ReadRequest + ?Sized,
212 O: Send,
213 F: FnMut(Result<&I, Error>) -> Option<Result<O, Error>> + Send,
214{
215 func: F,
216 queue: VecDeque<Result<O, Error>>,
217 _p: PhantomData<I>,
218}
219
220impl<I, O, F> QueueFn<I, O, F>
221where
222 I: ReadRequest + ?Sized,
223 O: Send,
224 F: FnMut(Result<&I, Error>) -> Option<Result<O, Error>> + Send,
225{
226 pub(crate) fn new(f: F) -> Self {
227 Self {
228 func: f,
229 queue: VecDeque::new(),
230 _p: PhantomData,
231 }
232 }
233}
234
235impl<I, O, F> Queue for QueueFn<I, O, F>
236where
237 I: ReadRequest + ?Sized,
238 O: Send,
239 F: FnMut(Result<&I, Error>) -> Option<Result<O, Error>> + Send,
240{
241 type Request = I;
242 type Output = O;
243 fn push(&mut self, input: Result<&Self::Request, Error>) {
244 if let Some(output) = (self.func)(input) {
245 self.queue.push_back(output);
246 }
247 }
248 fn pop(&mut self) -> Option<Result<Self::Output, Error>> {
249 self.queue.pop_front()
250 }
251}