cog_task/action/core/
repeat.rs

1use crate::action::{Action, ActionSignal, Props, StatefulAction, INFINITE};
2use crate::comm::{QWriter, Signal, SignalId};
3use crate::resource::{IoManager, ResourceAddr, ResourceManager};
4use crate::server::{AsyncSignal, Config, State, SyncSignal};
5use eframe::egui::Ui;
6use eyre::{eyre, Context, Result};
7use serde::{Deserialize, Serialize};
8use std::collections::{BTreeSet, VecDeque};
9use std::sync::mpsc::{self, RecvError, Sender};
10use std::sync::{Arc, Mutex};
11use std::thread;
12
13#[derive(Debug, Deserialize, Serialize)]
14pub struct Repeat(
15    Box<dyn Action>,
16    #[serde(default = "defaults::prefetch")] usize,
17);
18
19stateful!(Repeat {
20    inner: Box<dyn StatefulAction>,
21    queue: Arc<Mutex<VecDeque<Box<dyn StatefulAction>>>>,
22    link: Sender<()>,
23});
24
25mod defaults {
26    pub fn prefetch() -> usize {
27        2
28    }
29}
30
31impl Action for Repeat {
32    fn in_signals(&self) -> BTreeSet<SignalId> {
33        self.0.in_signals()
34    }
35
36    fn out_signals(&self) -> BTreeSet<SignalId> {
37        self.0.out_signals()
38    }
39
40    fn resources(&self, config: &Config) -> Vec<ResourceAddr> {
41        self.0.resources(config)
42    }
43
44    fn stateful(
45        &self,
46        io: &IoManager,
47        res: &ResourceManager,
48        config: &Config,
49        sync_writer: &QWriter<SyncSignal>,
50        async_writer: &QWriter<AsyncSignal>,
51    ) -> Result<Box<dyn StatefulAction>> {
52        let (tx, rx) = mpsc::channel();
53
54        let mut queue = VecDeque::new();
55        for _ in 0..self.1 {
56            queue.push_back(
57                self.0
58                    .stateful(io, res, config, sync_writer, async_writer)?,
59            );
60        }
61
62        let queue = Arc::new(Mutex::new(queue));
63
64        {
65            let queue = queue.clone();
66            let blueprint =
67                serde_cbor::to_vec(&self.0).wrap_err("Failed to serialize action blueprint.")?;
68
69            let res = res.clone();
70            let config = config.clone();
71            let mut sync_writer = sync_writer.clone();
72            let async_writer = async_writer.clone();
73
74            thread::spawn(move || {
75                let io = match IoManager::new(&config)
76                    .wrap_err("Failed to create new IoManager for prefetcher.")
77                {
78                    Ok(io) => io,
79                    Err(e) => {
80                        sync_writer.push(SyncSignal::Error(e));
81                        return;
82                    }
83                };
84
85                let blueprint: Box<dyn Action> = match serde_cbor::from_slice(&blueprint)
86                    .wrap_err("Failed to deserialize action blueprint.")
87                {
88                    Ok(v) => v,
89                    Err(e) => {
90                        sync_writer.push(SyncSignal::Error(e));
91                        return;
92                    }
93                };
94
95                loop {
96                    if let Err(RecvError) = rx.recv() {
97                        break;
98                    } else {
99                        match blueprint
100                            .stateful(&io, &res, &config, &sync_writer, &async_writer)
101                            .wrap_err("Failed to prefetch inner stateful action for Repeat.")
102                        {
103                            Ok(inner) => {
104                                queue.lock().unwrap().push_back(inner);
105                            }
106                            Err(e) => {
107                                sync_writer.push(SyncSignal::Error(e));
108                                break;
109                            }
110                        }
111                    }
112                }
113            });
114        }
115
116        Ok(Box::new(StatefulRepeat {
117            done: false,
118            inner: self
119                .0
120                .stateful(io, res, config, sync_writer, async_writer)?,
121            queue,
122            link: tx,
123        }))
124    }
125}
126
127impl StatefulAction for StatefulRepeat {
128    impl_stateful!();
129
130    fn props(&self) -> Props {
131        (self.inner.props().bits() | INFINITE).into()
132    }
133
134    fn start(
135        &mut self,
136        sync_writer: &mut QWriter<SyncSignal>,
137        async_writer: &mut QWriter<AsyncSignal>,
138        state: &State,
139    ) -> Result<Signal> {
140        self.inner.start(sync_writer, async_writer, state)
141    }
142
143    fn update(
144        &mut self,
145        signal: &ActionSignal,
146        sync_writer: &mut QWriter<SyncSignal>,
147        async_writer: &mut QWriter<AsyncSignal>,
148        state: &State,
149    ) -> Result<Signal> {
150        let mut news = vec![];
151        news.extend(
152            self.inner
153                .update(signal, sync_writer, async_writer, state)?,
154        );
155
156        if self.inner.is_over()? {
157            if let Some(inner) = self.queue.lock().unwrap().pop_front() {
158                self.inner = inner;
159                news.extend(self.inner.start(sync_writer, async_writer, state)?);
160            } else {
161                return Err(eyre!(
162                    "Failed to immediately restart action. Try increasing prefetch queue size."
163                ));
164            }
165
166            if self.link.send(()).is_err() {
167                return Err(eyre!("Action prefetcher did not respond to request."));
168            }
169        }
170
171        Ok(news.into())
172    }
173
174    fn show(
175        &mut self,
176        ui: &mut Ui,
177        sync_writer: &mut QWriter<SyncSignal>,
178        async_writer: &mut QWriter<AsyncSignal>,
179        state: &State,
180    ) -> Result<()> {
181        self.inner.show(ui, sync_writer, async_writer, state)
182    }
183
184    fn stop(
185        &mut self,
186        sync_writer: &mut QWriter<SyncSignal>,
187        async_writer: &mut QWriter<AsyncSignal>,
188        state: &State,
189    ) -> Result<Signal> {
190        self.inner.stop(sync_writer, async_writer, state)
191    }
192}