cog_task/action/core/
repeat.rs1use crate::action::{Action, ActionSignal, Props, StatefulAction, INFINITE};
2use crate::comm::{QWriter, Signal, SignalId};
3use crate::resource::{IoManager, ResourceAddr, ResourceManager};
4use crate::server::{AsyncSignal, Config, State, SyncSignal};
5use eframe::egui::Ui;
6use eyre::{eyre, Context, Result};
7use serde::{Deserialize, Serialize};
8use std::collections::{BTreeSet, VecDeque};
9use std::sync::mpsc::{self, RecvError, Sender};
10use std::sync::{Arc, Mutex};
11use std::thread;
12
13#[derive(Debug, Deserialize, Serialize)]
14pub struct Repeat(
15 Box<dyn Action>,
16 #[serde(default = "defaults::prefetch")] usize,
17);
18
19stateful!(Repeat {
20 inner: Box<dyn StatefulAction>,
21 queue: Arc<Mutex<VecDeque<Box<dyn StatefulAction>>>>,
22 link: Sender<()>,
23});
24
25mod defaults {
26 pub fn prefetch() -> usize {
27 2
28 }
29}
30
31impl Action for Repeat {
32 fn in_signals(&self) -> BTreeSet<SignalId> {
33 self.0.in_signals()
34 }
35
36 fn out_signals(&self) -> BTreeSet<SignalId> {
37 self.0.out_signals()
38 }
39
40 fn resources(&self, config: &Config) -> Vec<ResourceAddr> {
41 self.0.resources(config)
42 }
43
44 fn stateful(
45 &self,
46 io: &IoManager,
47 res: &ResourceManager,
48 config: &Config,
49 sync_writer: &QWriter<SyncSignal>,
50 async_writer: &QWriter<AsyncSignal>,
51 ) -> Result<Box<dyn StatefulAction>> {
52 let (tx, rx) = mpsc::channel();
53
54 let mut queue = VecDeque::new();
55 for _ in 0..self.1 {
56 queue.push_back(
57 self.0
58 .stateful(io, res, config, sync_writer, async_writer)?,
59 );
60 }
61
62 let queue = Arc::new(Mutex::new(queue));
63
64 {
65 let queue = queue.clone();
66 let blueprint =
67 serde_cbor::to_vec(&self.0).wrap_err("Failed to serialize action blueprint.")?;
68
69 let res = res.clone();
70 let config = config.clone();
71 let mut sync_writer = sync_writer.clone();
72 let async_writer = async_writer.clone();
73
74 thread::spawn(move || {
75 let io = match IoManager::new(&config)
76 .wrap_err("Failed to create new IoManager for prefetcher.")
77 {
78 Ok(io) => io,
79 Err(e) => {
80 sync_writer.push(SyncSignal::Error(e));
81 return;
82 }
83 };
84
85 let blueprint: Box<dyn Action> = match serde_cbor::from_slice(&blueprint)
86 .wrap_err("Failed to deserialize action blueprint.")
87 {
88 Ok(v) => v,
89 Err(e) => {
90 sync_writer.push(SyncSignal::Error(e));
91 return;
92 }
93 };
94
95 loop {
96 if let Err(RecvError) = rx.recv() {
97 break;
98 } else {
99 match blueprint
100 .stateful(&io, &res, &config, &sync_writer, &async_writer)
101 .wrap_err("Failed to prefetch inner stateful action for Repeat.")
102 {
103 Ok(inner) => {
104 queue.lock().unwrap().push_back(inner);
105 }
106 Err(e) => {
107 sync_writer.push(SyncSignal::Error(e));
108 break;
109 }
110 }
111 }
112 }
113 });
114 }
115
116 Ok(Box::new(StatefulRepeat {
117 done: false,
118 inner: self
119 .0
120 .stateful(io, res, config, sync_writer, async_writer)?,
121 queue,
122 link: tx,
123 }))
124 }
125}
126
127impl StatefulAction for StatefulRepeat {
128 impl_stateful!();
129
130 fn props(&self) -> Props {
131 (self.inner.props().bits() | INFINITE).into()
132 }
133
134 fn start(
135 &mut self,
136 sync_writer: &mut QWriter<SyncSignal>,
137 async_writer: &mut QWriter<AsyncSignal>,
138 state: &State,
139 ) -> Result<Signal> {
140 self.inner.start(sync_writer, async_writer, state)
141 }
142
143 fn update(
144 &mut self,
145 signal: &ActionSignal,
146 sync_writer: &mut QWriter<SyncSignal>,
147 async_writer: &mut QWriter<AsyncSignal>,
148 state: &State,
149 ) -> Result<Signal> {
150 let mut news = vec![];
151 news.extend(
152 self.inner
153 .update(signal, sync_writer, async_writer, state)?,
154 );
155
156 if self.inner.is_over()? {
157 if let Some(inner) = self.queue.lock().unwrap().pop_front() {
158 self.inner = inner;
159 news.extend(self.inner.start(sync_writer, async_writer, state)?);
160 } else {
161 return Err(eyre!(
162 "Failed to immediately restart action. Try increasing prefetch queue size."
163 ));
164 }
165
166 if self.link.send(()).is_err() {
167 return Err(eyre!("Action prefetcher did not respond to request."));
168 }
169 }
170
171 Ok(news.into())
172 }
173
174 fn show(
175 &mut self,
176 ui: &mut Ui,
177 sync_writer: &mut QWriter<SyncSignal>,
178 async_writer: &mut QWriter<AsyncSignal>,
179 state: &State,
180 ) -> Result<()> {
181 self.inner.show(ui, sync_writer, async_writer, state)
182 }
183
184 fn stop(
185 &mut self,
186 sync_writer: &mut QWriter<SyncSignal>,
187 async_writer: &mut QWriter<AsyncSignal>,
188 state: &State,
189 ) -> Result<Signal> {
190 self.inner.stop(sync_writer, async_writer, state)
191 }
192}