epics_ca/channel/
typed.rs

1use super::{
2    get::Callback,
3    subscribe::{LastFn, Queue, QueueFn},
4    Channel, Get, GetFn, Put, Subscription,
5};
6use crate::{
7    error::{self, Error},
8    request::{ReadRequest, Request, TypedRequest, WriteRequest},
9    types::{Field, Value},
10};
11use derive_more::{Deref, DerefMut, Into};
12use std::{
13    any::type_name,
14    fmt::{self, Debug},
15    marker::PhantomData,
16};
17
18impl Channel {
19    fn check_type<V: Value + ?Sized>(&self) -> Result<(), Error> {
20        if <V::Item as Field>::ID != self.field_type()? {
21            Err(error::BADTYPE)
22        } else if !V::check_len(self.element_count()?) {
23            Err(error::BADCOUNT)
24        } else {
25            Ok(())
26        }
27    }
28
29    /// Convert into [`TypedChannel`].
30    ///
31    /// Conversion is successful if actual channel type matches the one passed as a parameter `V`.
32    pub fn into_typed<V: Value + ?Sized>(self) -> Result<TypedChannel<V>, (Error, Self)> {
33        match self.check_type::<V>() {
34            Ok(()) => Ok(TypedChannel::new_unchecked(self)),
35            Err(err) => Err((err, self)),
36        }
37    }
38}
39
40/// Typed channel.
41///
42/// Used to make typed requests, e.g. such requests that contains typed value.
43#[repr(transparent)]
44#[derive(Deref, DerefMut, Into)]
45pub struct TypedChannel<V: Value + ?Sized> {
46    #[deref]
47    #[deref_mut]
48    pub(crate) base: Channel,
49    #[into(ignore)]
50    _p: PhantomData<V>,
51}
52
53impl<V: Value + ?Sized> TypedChannel<V> {
54    /// Convert [`Channel`] to [`TypedChannel<V>`] without type checking.
55    ///
56    /// It is safe because the type of remote channel can change at any moment and checks are done during reading/writing/monitoring anyway.
57    ///
58    /// If you want to check type before converting use [`Channel::into_typed`].
59    pub fn new_unchecked(base: Channel) -> Self {
60        Self {
61            base,
62            _p: PhantomData,
63        }
64    }
65}
66
67impl<V: Value + ?Sized> Debug for TypedChannel<V> {
68    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
69        write!(f, "TypedChannel<{}>({:?})", type_name::<V>(), self.raw())
70    }
71}
72
73impl<V: Value + ?Sized> TypedChannel<V> {
74    /// Make write request by reference.
75    pub fn put_ref<R>(&mut self, req: &R) -> Result<Put<'_>, Error>
76    where
77        R: TypedRequest<Value = V> + WriteRequest + ?Sized,
78    {
79        self.base.put_ref::<R>(req)
80    }
81
82    /// Make read request and call closure when it's done, successfully or not.
83    pub fn get_with<R, F>(&mut self, func: F) -> Get<'_, F>
84    where
85        R: TypedRequest<Value = V> + ReadRequest + ?Sized,
86        F: Callback<Request = R>,
87    {
88        self.base.get_with(func)
89    }
90
91    /// Subscribe to channel updates and call closure each time when update occured.
92    pub fn subscribe_with<F: Queue>(&mut self, func: F) -> Subscription<'_, F>
93    where
94        F::Request: TypedRequest<Value = V> + ReadRequest,
95    {
96        Subscription::new(self, func)
97    }
98}
99
100impl<T: Field> TypedChannel<[T]> {
101    /// Make read request and obtain boxed response.
102    pub fn get_boxed<R>(&mut self) -> Get<'_, GetFn<R, Box<R>>>
103    where
104        R: TypedRequest<Value = [T]> + ReadRequest + ?Sized,
105    {
106        self.get_with(GetFn::<R, Box<R>>::new(clone_boxed::<R>))
107    }
108
109    /// Subscribe to channel updates and obtain stream that provides boxed responses.
110    pub fn subscribe_boxed<R>(&mut self) -> Subscription<'_, LastFn<R, Box<R>>>
111    where
112        R: TypedRequest<Value = [T]> + ReadRequest + ?Sized,
113    {
114        self.subscribe_with(LastFn::<R, Box<R>>::new(clone_boxed_some::<R>))
115    }
116}
117
118impl<T: Field> TypedChannel<T> {
119    /// Write scalar request.
120    pub fn put<R>(&mut self, req: R) -> Result<Put<'_>, Error>
121    where
122        R: TypedRequest<Value = T> + WriteRequest,
123    {
124        self.put_ref::<R>(&req)
125    }
126
127    /// Get result of scalar read request.
128    pub fn get<R>(&mut self) -> Get<'_, GetFn<R, R>>
129    where
130        R: TypedRequest<Value = T> + ReadRequest + Copy,
131    {
132        self.get_with(GetFn::<R, R>::new(copy::<R>))
133    }
134
135    /// Subscribe to updates of scalar channel.
136    ///
137    /// Note, that returned stream stores only last unread value.
138    /// To store all values use [`Self::subscribe_buffered`].
139    pub fn subscribe<R>(&mut self) -> Subscription<'_, LastFn<R, R>>
140    where
141        R: TypedRequest<Value = T> + ReadRequest + Copy,
142    {
143        self.subscribe_with(LastFn::<R, R>::new(copy_some::<R>))
144    }
145
146    /// Subscribe to updates of scalar channel and store all updates.
147    ///
148    /// This subscription contains internal buffer that can grow up to arbitrary size
149    /// especially in case of frequent channel updates.
150    pub fn subscribe_buffered<R>(&mut self) -> Subscription<'_, QueueFn<R, R>>
151    where
152        R: TypedRequest<Value = T> + ReadRequest + Copy,
153    {
154        self.subscribe_with(QueueFn::<R, R>::new(copy_some::<R>))
155    }
156}
157
158fn clone_boxed<R: Request + ?Sized>(input: Result<&R, Error>) -> Result<Box<R>, Error> {
159    input.map(|req| req.clone_boxed())
160}
161
162fn clone_boxed_some<R: Request + ?Sized>(
163    input: Result<&R, Error>,
164) -> Option<Result<Box<R>, Error>> {
165    Some(input.map(|req| req.clone_boxed()))
166}
167
168fn copy<R: Copy>(input: Result<&R, Error>) -> Result<R, Error> {
169    input.copied()
170}
171
172fn copy_some<R: Copy>(input: Result<&R, Error>) -> Option<Result<R, Error>> {
173    Some(input.copied())
174}
175
176#[cfg(test)]
177mod tests {
178    use crate::{Channel, Context};
179    use async_std::test as async_test;
180    use cstr::cstr;
181    use serial_test::serial;
182
183    #[async_test]
184    #[serial]
185    async fn downcast() {
186        let ctx = Context::new().unwrap();
187        let mut base = Channel::new(&ctx, cstr!("ca:test:ai")).unwrap();
188        base.connected().await;
189        let base = base.into_typed::<u8>().unwrap_err().1;
190        base.into_typed::<f64>().unwrap();
191    }
192}