epics_ca/channel/
value.rs

1use super::{
2    get::Callback,
3    subscribe::{LastFn, Queue, QueueFn},
4    typed::TypedChannel,
5    Get, GetFn, Put, Subscription,
6};
7use crate::{
8    error::Error,
9    request::Request,
10    types::{Field, Value},
11};
12use derive_more::{Deref, DerefMut, From, Into};
13use std::{
14    any::type_name,
15    fmt::{self, Debug},
16};
17
18impl<V: Value + ?Sized> TypedChannel<V> {
19    pub fn into_value(self) -> ValueChannel<V> {
20        ValueChannel::from(self)
21    }
22}
23
24/// Channel used to read and write only value rather than other requests.
25#[repr(transparent)]
26#[derive(From, Into, Deref, DerefMut)]
27pub struct ValueChannel<V: Value + ?Sized> {
28    pub(crate) typed: TypedChannel<V>,
29}
30
31impl<V: Value + ?Sized> Debug for ValueChannel<V> {
32    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
33        write!(f, "ValueChannel<{}>({:?})", type_name::<V>(), self.raw())
34    }
35}
36
37impl<V: Value + ?Sized> ValueChannel<V> {
38    /// Write value by reference to the channel.
39    pub fn put_ref(&mut self, data: &V) -> Result<Put<'_>, Error> {
40        self.typed.put_ref::<V>(data)
41    }
42
43    /// Request value from the channel and call callback when it's done.
44    pub fn get_with<F>(&mut self, func: F) -> Get<'_, F>
45    where
46        F: Callback<Request = V>,
47    {
48        self.typed.get_with(func)
49    }
50
51    /// Subscribe to value updates and call closure each time when update occured.
52    pub fn subscribe_with<F: Queue<Request = V>>(&mut self, func: F) -> Subscription<'_, F> {
53        self.typed.subscribe_with(func)
54    }
55}
56
57impl<T: Field> ValueChannel<[T]> {
58    /// Request array value and store it in [`Vec`].
59    pub fn get_vec(&mut self) -> Get<'_, GetFn<[T], Vec<T>>> {
60        self.get_with(GetFn::<[T], Vec<T>>::new(clone_vec::<T>))
61    }
62
63    /// Write value to slice and return received value length (which may be greater than `dst` length).
64    pub fn get_to_slice<'a, 'b>(&'a mut self, dst: &'b mut [T]) -> Get<'a, GetToSlice<'b, T>> {
65        self.get_with(GetToSlice { dst })
66    }
67
68    /// Subscribe to array value updates and obtain [`Vec`] stream.
69    pub fn subscribe_vec(&mut self) -> Subscription<'_, LastFn<[T], Vec<T>>> {
70        self.subscribe_with(LastFn::<[T], Vec<T>>::new(clone_vec_some::<T>))
71    }
72}
73
74impl<T: Field> ValueChannel<T> {
75    /// Write scalar value.
76    pub fn put(&mut self, val: T) -> Result<Put<'_>, Error> {
77        self.typed.put::<T>(val)
78    }
79
80    /// Get scalar value.
81    pub fn get(&mut self) -> Get<'_, GetFn<T, T>> {
82        self.typed.get::<T>()
83    }
84
85    /// Subscribe to updates of scalar value.
86    ///
87    /// See [`TypedChannel::subscribe`].
88    pub fn subscribe(&mut self) -> Subscription<'_, LastFn<T, T>> {
89        self.typed.subscribe::<T>()
90    }
91
92    /// Subscribe to updates of scalar value and store all updates.
93    ///
94    /// See [`TypedChannel::subscribe_buffered`].
95    pub fn subscribe_buffered(&mut self) -> Subscription<'_, QueueFn<T, T>> {
96        self.typed.subscribe_buffered::<T>()
97    }
98}
99
100fn clone_vec<T: Field>(input: Result<&[T], Error>) -> Result<Vec<T>, Error> {
101    input.map(|data| Vec::from(data.clone_boxed()))
102}
103
104fn clone_vec_some<T: Field>(input: Result<&[T], Error>) -> Option<Result<Vec<T>, Error>> {
105    Some(input.map(|data| Vec::from(data.clone_boxed())))
106}
107
108pub struct GetToSlice<'a, T: Field> {
109    dst: &'a mut [T],
110}
111
112impl<'a, T: Field> Callback for GetToSlice<'a, T> {
113    type Request = [T];
114    type Output = usize;
115    fn apply(self, input: Result<&Self::Request, Error>) -> Result<Self::Output, Error> {
116        input.map(|src| {
117            let len = usize::min(self.dst.len(), src.len());
118            self.dst[..len].copy_from_slice(&src[..len]);
119            src.len()
120        })
121    }
122}
123
124#[cfg(test)]
125mod tests {
126    use crate::Context;
127    use async_std::{task::sleep, test as async_test};
128    use cstr::cstr;
129    use futures::{join, pin_mut, StreamExt};
130    use serial_test::serial;
131    use std::{f64::consts::PI, time::Duration};
132
133    #[async_test]
134    #[serial]
135    async fn put_get_scalar() {
136        let ctx = Context::new().unwrap();
137
138        let mut output = ctx.connect::<f64>(cstr!("ca:test:ao")).await.unwrap();
139        output.put(PI).unwrap().await.unwrap();
140
141        let mut input = ctx.connect::<f64>(cstr!("ca:test:ai")).await.unwrap();
142        assert_eq!(input.get().await.unwrap(), PI);
143    }
144
145    #[async_test]
146    #[serial]
147    async fn subscribe_buffered() {
148        let ctx = Context::new().unwrap();
149
150        let mut output = ctx.connect::<f64>(cstr!("ca:test:ao")).await.unwrap();
151        let mut input = ctx.connect::<f64>(cstr!("ca:test:ai")).await.unwrap();
152
153        output.put(0.0).unwrap().await.unwrap();
154        let monitor = input.subscribe_buffered();
155        pin_mut!(monitor);
156        assert_eq!(monitor.next().await.unwrap().unwrap(), 0.0);
157
158        let count = 0x10;
159        let values = (0..count)
160            .map(|i| (i + 1) as f64 / 16.0)
161            .collect::<Vec<_>>();
162        join!(
163            async {
164                for x in values.iter() {
165                    output.put(*x).unwrap().await.unwrap();
166                }
167            },
168            async {
169                for x in values.iter() {
170                    assert_eq!(monitor.next().await.unwrap().unwrap(), *x);
171                    sleep(Duration::from_millis(10)).await;
172                }
173            }
174        );
175    }
176
177    #[async_test]
178    #[serial]
179    async fn put_get_array() {
180        let ctx = Context::new().unwrap();
181
182        let mut output = ctx.connect::<[i32]>(cstr!("ca:test:aao")).await.unwrap();
183        let mut input = ctx.connect::<[i32]>(cstr!("ca:test:aai")).await.unwrap();
184
185        let data = (0..8).collect::<Vec<i32>>();
186        output.put_ref(&data).unwrap().await.unwrap();
187        assert_eq!(input.get_vec().await.unwrap(), data);
188    }
189
190    #[async_test]
191    #[serial]
192    async fn subscribe_array() {
193        let ctx = Context::new().unwrap();
194
195        let mut output = ctx.connect::<[i32]>(cstr!("ca:test:aao")).await.unwrap();
196        let mut input = ctx.connect::<[i32]>(cstr!("ca:test:aai")).await.unwrap();
197
198        output.put_ref(&[-1]).unwrap().await.unwrap();
199        let monitor = input.subscribe_vec();
200        pin_mut!(monitor);
201        assert_eq!(monitor.next().await.unwrap().unwrap(), [-1]);
202
203        let count = 0x10;
204        for i in 0..count {
205            let data = (0..(i + 1)).collect::<Vec<_>>();
206            output.put_ref(&data).unwrap().await.unwrap();
207            assert_eq!(monitor.next().await.unwrap().unwrap(), data);
208        }
209    }
210}