epics_ca/channel/
value.rs1use super::{
2 get::Callback,
3 subscribe::{LastFn, Queue, QueueFn},
4 typed::TypedChannel,
5 Get, GetFn, Put, Subscription,
6};
7use crate::{
8 error::Error,
9 request::Request,
10 types::{Field, Value},
11};
12use derive_more::{Deref, DerefMut, From, Into};
13use std::{
14 any::type_name,
15 fmt::{self, Debug},
16};
17
18impl<V: Value + ?Sized> TypedChannel<V> {
19 pub fn into_value(self) -> ValueChannel<V> {
20 ValueChannel::from(self)
21 }
22}
23
24#[repr(transparent)]
26#[derive(From, Into, Deref, DerefMut)]
27pub struct ValueChannel<V: Value + ?Sized> {
28 pub(crate) typed: TypedChannel<V>,
29}
30
31impl<V: Value + ?Sized> Debug for ValueChannel<V> {
32 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
33 write!(f, "ValueChannel<{}>({:?})", type_name::<V>(), self.raw())
34 }
35}
36
37impl<V: Value + ?Sized> ValueChannel<V> {
38 pub fn put_ref(&mut self, data: &V) -> Result<Put<'_>, Error> {
40 self.typed.put_ref::<V>(data)
41 }
42
43 pub fn get_with<F>(&mut self, func: F) -> Get<'_, F>
45 where
46 F: Callback<Request = V>,
47 {
48 self.typed.get_with(func)
49 }
50
51 pub fn subscribe_with<F: Queue<Request = V>>(&mut self, func: F) -> Subscription<'_, F> {
53 self.typed.subscribe_with(func)
54 }
55}
56
57impl<T: Field> ValueChannel<[T]> {
58 pub fn get_vec(&mut self) -> Get<'_, GetFn<[T], Vec<T>>> {
60 self.get_with(GetFn::<[T], Vec<T>>::new(clone_vec::<T>))
61 }
62
63 pub fn get_to_slice<'a, 'b>(&'a mut self, dst: &'b mut [T]) -> Get<'a, GetToSlice<'b, T>> {
65 self.get_with(GetToSlice { dst })
66 }
67
68 pub fn subscribe_vec(&mut self) -> Subscription<'_, LastFn<[T], Vec<T>>> {
70 self.subscribe_with(LastFn::<[T], Vec<T>>::new(clone_vec_some::<T>))
71 }
72}
73
74impl<T: Field> ValueChannel<T> {
75 pub fn put(&mut self, val: T) -> Result<Put<'_>, Error> {
77 self.typed.put::<T>(val)
78 }
79
80 pub fn get(&mut self) -> Get<'_, GetFn<T, T>> {
82 self.typed.get::<T>()
83 }
84
85 pub fn subscribe(&mut self) -> Subscription<'_, LastFn<T, T>> {
89 self.typed.subscribe::<T>()
90 }
91
92 pub fn subscribe_buffered(&mut self) -> Subscription<'_, QueueFn<T, T>> {
96 self.typed.subscribe_buffered::<T>()
97 }
98}
99
100fn clone_vec<T: Field>(input: Result<&[T], Error>) -> Result<Vec<T>, Error> {
101 input.map(|data| Vec::from(data.clone_boxed()))
102}
103
104fn clone_vec_some<T: Field>(input: Result<&[T], Error>) -> Option<Result<Vec<T>, Error>> {
105 Some(input.map(|data| Vec::from(data.clone_boxed())))
106}
107
108pub struct GetToSlice<'a, T: Field> {
109 dst: &'a mut [T],
110}
111
112impl<'a, T: Field> Callback for GetToSlice<'a, T> {
113 type Request = [T];
114 type Output = usize;
115 fn apply(self, input: Result<&Self::Request, Error>) -> Result<Self::Output, Error> {
116 input.map(|src| {
117 let len = usize::min(self.dst.len(), src.len());
118 self.dst[..len].copy_from_slice(&src[..len]);
119 src.len()
120 })
121 }
122}
123
124#[cfg(test)]
125mod tests {
126 use crate::Context;
127 use async_std::{task::sleep, test as async_test};
128 use cstr::cstr;
129 use futures::{join, pin_mut, StreamExt};
130 use serial_test::serial;
131 use std::{f64::consts::PI, time::Duration};
132
133 #[async_test]
134 #[serial]
135 async fn put_get_scalar() {
136 let ctx = Context::new().unwrap();
137
138 let mut output = ctx.connect::<f64>(cstr!("ca:test:ao")).await.unwrap();
139 output.put(PI).unwrap().await.unwrap();
140
141 let mut input = ctx.connect::<f64>(cstr!("ca:test:ai")).await.unwrap();
142 assert_eq!(input.get().await.unwrap(), PI);
143 }
144
145 #[async_test]
146 #[serial]
147 async fn subscribe_buffered() {
148 let ctx = Context::new().unwrap();
149
150 let mut output = ctx.connect::<f64>(cstr!("ca:test:ao")).await.unwrap();
151 let mut input = ctx.connect::<f64>(cstr!("ca:test:ai")).await.unwrap();
152
153 output.put(0.0).unwrap().await.unwrap();
154 let monitor = input.subscribe_buffered();
155 pin_mut!(monitor);
156 assert_eq!(monitor.next().await.unwrap().unwrap(), 0.0);
157
158 let count = 0x10;
159 let values = (0..count)
160 .map(|i| (i + 1) as f64 / 16.0)
161 .collect::<Vec<_>>();
162 join!(
163 async {
164 for x in values.iter() {
165 output.put(*x).unwrap().await.unwrap();
166 }
167 },
168 async {
169 for x in values.iter() {
170 assert_eq!(monitor.next().await.unwrap().unwrap(), *x);
171 sleep(Duration::from_millis(10)).await;
172 }
173 }
174 );
175 }
176
177 #[async_test]
178 #[serial]
179 async fn put_get_array() {
180 let ctx = Context::new().unwrap();
181
182 let mut output = ctx.connect::<[i32]>(cstr!("ca:test:aao")).await.unwrap();
183 let mut input = ctx.connect::<[i32]>(cstr!("ca:test:aai")).await.unwrap();
184
185 let data = (0..8).collect::<Vec<i32>>();
186 output.put_ref(&data).unwrap().await.unwrap();
187 assert_eq!(input.get_vec().await.unwrap(), data);
188 }
189
190 #[async_test]
191 #[serial]
192 async fn subscribe_array() {
193 let ctx = Context::new().unwrap();
194
195 let mut output = ctx.connect::<[i32]>(cstr!("ca:test:aao")).await.unwrap();
196 let mut input = ctx.connect::<[i32]>(cstr!("ca:test:aai")).await.unwrap();
197
198 output.put_ref(&[-1]).unwrap().await.unwrap();
199 let monitor = input.subscribe_vec();
200 pin_mut!(monitor);
201 assert_eq!(monitor.next().await.unwrap().unwrap(), [-1]);
202
203 let count = 0x10;
204 for i in 0..count {
205 let data = (0..(i + 1)).collect::<Vec<_>>();
206 output.put_ref(&data).unwrap().await.unwrap();
207 assert_eq!(monitor.next().await.unwrap().unwrap(), data);
208 }
209 }
210}