1use super::{base::UserData, Channel};
2use crate::{
3 error::{result_from_raw, Error},
4 request::{ReadRequest, Request},
5 types::RequestId,
6};
7use pin_project::{pin_project, pinned_drop};
8use std::{
9 cell::UnsafeCell,
10 future::Future,
11 marker::{PhantomData, PhantomPinned},
12 mem,
13 pin::Pin,
14 ptr,
15 task::{Context, Poll},
16};
17
18pub trait Callback: Send {
20 type Request: ReadRequest + ?Sized;
21 type Output: Send;
22
23 fn apply(self, input: Result<&Self::Request, Error>) -> Result<Self::Output, Error>;
25}
26
27pub(crate) enum GetState<F: Callback> {
28 Empty,
29 Pending(F),
30 Ready(Result<F::Output, Error>),
31}
32
33#[must_use]
35#[pin_project(PinnedDrop)]
36pub struct Get<'a, F: Callback> {
37 owner: &'a mut Channel,
38 state: UnsafeCell<GetState<F>>,
40 started: bool,
41 #[pin]
42 _pp: PhantomPinned,
43}
44
45impl<'a, F: Callback> Get<'a, F> {
46 pub(crate) fn new(owner: &'a mut Channel, func: F) -> Self {
47 Self {
48 owner,
49 state: UnsafeCell::new(GetState::Pending(func)),
50 started: false,
51 _pp: PhantomPinned,
52 }
53 }
54
55 pub fn start(self: Pin<&mut Self>) -> Result<(), Error> {
60 assert!(!self.started);
61 let this = self.project();
62 let owner = this.owner;
63 owner.context().with(|| {
64 let mut proc = owner.user_data().process.lock().unwrap();
65 proc.data = this.state.get() as *mut u8;
66 result_from_raw(unsafe {
67 sys::ca_array_get_callback(
68 F::Request::ID.raw() as _,
69 0,
70 owner.raw(),
71 Some(Self::callback),
72 proc.id() as _,
73 )
74 })
75 .map(|()| {
76 owner.context().flush_io();
77 *this.started = true
78 })
79 })
80 }
81
82 unsafe extern "C" fn callback(args: sys::event_handler_args) {
83 let user_data = &*(sys::ca_puser(args.chid) as *const UserData);
84 let proc = user_data.process.lock().unwrap();
85 if proc.id() != args.usr as usize {
86 return;
87 }
88 let state = &mut *(proc.data as *mut GetState<F>);
89 let func = match mem::replace(state, GetState::Empty) {
90 GetState::Pending(func) => func,
91 _ => unreachable!(),
92 };
93 *state = GetState::Ready(func.apply(result_from_raw(args.status).and_then(|()| {
94 F::Request::from_ptr(
95 args.dbr as *const u8,
96 RequestId::try_from_raw(args.type_ as _).unwrap(),
97 args.count as usize,
98 )
99 })));
100 user_data.waker.wake();
101 }
102}
103
104impl<'a, F: Callback> Future for Get<'a, F> {
105 type Output = Result<F::Output, Error>;
106 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
107 self.owner.user_data().waker.register(cx.waker());
108 if !self.started {
109 self.start()?;
110 return Poll::Pending;
111 }
112 let this = self.project();
113 let proc = this.owner.user_data().process.lock().unwrap();
114 let state = unsafe { &mut *this.state.get() };
115 let poll = match mem::replace(state, GetState::Empty) {
116 GetState::Empty => unreachable!(),
117 GetState::Pending(func) => {
118 *state = GetState::Pending(func);
119 Poll::Pending
120 }
121 GetState::Ready(res) => match res {
122 Ok(ret) => Poll::Ready(Ok(ret)),
123 Err(err) => Poll::Ready(Err(err)),
124 },
125 };
126 drop(proc);
127 poll
128 }
129}
130
131#[pinned_drop]
132impl<'a, F: Callback> PinnedDrop for Get<'a, F> {
133 #[allow(clippy::needless_lifetimes)]
134 fn drop(self: Pin<&mut Self>) {
135 let mut proc = self.owner.user_data().process.lock().unwrap();
136 proc.change_id();
137 proc.data = ptr::null_mut();
138 }
139}
140
141pub struct GetFn<R, O, F = fn(Result<&R, Error>) -> Result<O, Error>>
143where
144 R: ReadRequest + ?Sized,
145 O: Send,
146 F: FnOnce(Result<&R, Error>) -> Result<O, Error> + Send,
147{
148 func: F,
149 _p: PhantomData<(*const R, O)>,
150}
151
152impl<R, O, F> GetFn<R, O, F>
153where
154 R: ReadRequest + ?Sized,
155 O: Send,
156 F: FnOnce(Result<&R, Error>) -> Result<O, Error> + Send,
157{
158 pub(crate) fn new(f: F) -> Self {
159 Self {
160 func: f,
161 _p: PhantomData,
162 }
163 }
164}
165
166unsafe impl<R, O, F> Send for GetFn<R, O, F>
167where
168 R: ReadRequest + ?Sized,
169 O: Send,
170 F: FnOnce(Result<&R, Error>) -> Result<O, Error> + Send,
171{
172}
173
174impl<R, O, F> Callback for GetFn<R, O, F>
175where
176 R: ReadRequest + ?Sized,
177 O: Send,
178 F: FnOnce(Result<&R, Error>) -> Result<O, Error> + Send,
179{
180 type Request = R;
181 type Output = O;
182 fn apply(self, input: Result<&Self::Request, Error>) -> Result<Self::Output, Error> {
183 (self.func)(input)
184 }
185}