flo_scene/host/scene.rs
1use crate::host::connect_result::*;
2use crate::host::initialisation_context::*;
3use crate::host::input_stream::*;
4use crate::host::output_sink::*;
5use crate::host::scene_context::*;
6use crate::host::scene_core::*;
7use crate::host::scene_message::*;
8use crate::host::stream_id::*;
9use crate::host::stream_source::*;
10use crate::host::stream_target::*;
11use crate::host::subprogram_id::*;
12use crate::host::error::*;
13use crate::host::programs::*;
14
15use futures::prelude::*;
16use futures::channel::oneshot;
17use futures::future::{poll_fn};
18use futures::{pin_mut};
19
20use std::io::{stdin, stdout, stderr, BufReader};
21use std::sync::*;
22use std::collections::{HashSet};
23
24///
25/// A scene represents a set of running co-programs, creating a larger self-contained piece of
26/// software out of a set of smaller pieces of software that communicate via streams.
27///
28#[derive(Clone)]
29pub struct Scene {
30 core: Arc<Mutex<SceneCore>>,
31}
32
33impl Default for Scene {
34 fn default() -> Self {
35 Scene::with_standard_programs([
36 *SCENE_CONTROL_PROGRAM,
37 *OUTSIDE_SCENE_PROGRAM,
38 *STDIN_PROGRAM,
39 *STDOUT_PROGRAM,
40 *STDERR_PROGRAM,
41 *IDLE_NOTIFICATION_PROGRAM,
42 *TIMER_PROGRAM,
43 ])
44 }
45}
46
47impl Scene {
48 ///
49 /// Creates an empty scene (this has no control program so it won't start or connect any programs by default)
50 ///
51 pub fn empty() -> Self {
52 Scene {
53 core: Arc::new(Mutex::new(SceneCore::new()))
54 }
55 }
56
57 ///
58 /// Creates a new scene with a set of programs from the default set started
59 ///
60 /// For example, calling this as `Scene::with_standard_programs([*SCENE_CONTROL_PROGRAM])` will create a scene with only
61 /// the standard scene control program running.
62 ///
63 pub fn with_standard_programs(programs: impl IntoIterator<Item=SubProgramId>) -> Self {
64 let scene = Self::empty();
65 let programs = programs.into_iter().collect::<HashSet<_>>();
66
67 if programs.contains(&*SCENE_CONTROL_PROGRAM) {
68 let control_updates = SceneCore::send_updates_to_stream(&scene.core, *SCENE_CONTROL_PROGRAM);
69
70 scene.add_subprogram(*SCENE_CONTROL_PROGRAM, move |input, context| SceneControl::scene_control_program(input, context, control_updates), 0);
71 scene.connect_programs((), *SCENE_CONTROL_PROGRAM, StreamId::with_message_type::<Subscribe<SceneUpdate>>()).unwrap();
72 scene.connect_programs((), *SCENE_CONTROL_PROGRAM, StreamId::with_message_type::<Query<SceneUpdate>>()).unwrap();
73 }
74 if programs.contains(&*OUTSIDE_SCENE_PROGRAM) { scene.add_subprogram(*OUTSIDE_SCENE_PROGRAM, outside_scene_program, 0); }
75
76 if programs.contains(&*STDIN_PROGRAM) { scene.add_subprogram(*STDIN_PROGRAM, |input, context| text_input_subprogram(BufReader::new(stdin()), input, context), 0); }
77 if programs.contains(&*STDOUT_PROGRAM) { scene.add_subprogram(*STDOUT_PROGRAM, |input, context| text_io_subprogram(stdout(), input, context), 0); }
78 if programs.contains(&*STDERR_PROGRAM) { scene.add_subprogram(*STDERR_PROGRAM, |input, context| text_io_subprogram(stderr(), input, context), 0); }
79 if programs.contains(&*IDLE_NOTIFICATION_PROGRAM) { scene.add_subprogram(*IDLE_NOTIFICATION_PROGRAM, idle_subprogram, 20); }
80 if programs.contains(&*TIMER_PROGRAM) { scene.add_subprogram(*TIMER_PROGRAM, timer_subprogram, 0); }
81
82 scene
83 }
84
85 ///
86 /// Creates a duplicate scene object
87 ///
88 pub (crate) fn with_core(core: &Arc<Mutex<SceneCore>>) -> Self {
89 Scene {
90 core: core.clone()
91 }
92 }
93
94 ///
95 /// Gets a reference to the core of this scene
96 ///
97 #[inline]
98 pub (crate) fn core(&self) -> &Arc<Mutex<SceneCore>> {
99 &self.core
100 }
101
102 ///
103 /// Creates a stream that can be used to send messages into this scene from elsewhere
104 ///
105 /// This scene must have a `OUTSIDE_SCENE_PROGRAM` running in order to act as a source for these messages (and this can also be used to
106 /// connect or reconnect the streams returned by this function) .
107 ///
108 pub fn send_to_scene<TMessage>(&self, target: impl Into<StreamTarget>) -> Result<impl Sink<TMessage, Error=SceneSendError<TMessage>>, ConnectionError>
109 where
110 TMessage: 'static + SceneMessage,
111 {
112 let target = target.into();
113
114 SceneCore::initialise_message_type(&self.core, StreamId::with_message_type::<TMessage>());
115
116 // Fetch the outside scene program, which is the source for messages on this stream
117 let program_id = *OUTSIDE_SCENE_PROGRAM;
118 let program_core = self.core.lock().unwrap().get_sub_program(program_id).ok_or(ConnectionError::NoOutsideSceneSubProgram)?;
119 let stream_id = StreamId::with_message_type::<TMessage>().for_target(target.clone());
120
121 // Try to re-use an existing target
122 let existing_core = program_core.lock().unwrap().output_core(&stream_id);
123
124 if let Some(existing_core) = existing_core {
125 // Reattach to the existing output core
126 let output_sink = OutputSink::attach(program_id, existing_core, &self.core);
127 Ok(output_sink)
128 } else {
129 // Create a new target for this message
130 let sink_target = SceneCore::sink_for_target::<TMessage>(&self.core, &program_id, target)?;
131
132 // Try to attach it to the program (or just read the old version)
133 let new_or_old_target = program_core.lock().unwrap().try_create_output_target(&stream_id, sink_target);
134 let new_or_old_target = match new_or_old_target { Ok(new) => new, Err(old) => old };
135
136 // Report the new connection
137 let target_program = OutputSinkCore::target_program_id(&new_or_old_target);
138 let update = if let Some(target_program) = target_program {
139 SceneUpdate::Connected(program_id, target_program, stream_id)
140 } else {
141 SceneUpdate::Disconnected(program_id, stream_id)
142 };
143
144 SceneCore::send_scene_updates(&self.core, vec![update]);
145
146 // Create an output sink from the target
147 let output_sink = OutputSink::attach(program_id, new_or_old_target, &self.core);
148 Ok(output_sink)
149 }
150 }
151
152 ///
153 /// Returns a future that will run any waiting programs on the current thread
154 ///
155 pub fn run_scene(&self) -> impl Future<Output=()> {
156 run_core(&self.core)
157 }
158
159 ///
160 /// Returns a future that will run the scene across `num_threads` threads (including the thread this is awaited from)
161 ///
162 /// The subthreads will end when the scene is ended, or the returned future is dropped.
163 ///
164 pub fn run_scene_with_threads(&self, num_threads: usize) -> impl Future<Output=()> {
165 use futures::executor;
166 use std::thread::{JoinHandle};
167 use std::thread;
168 use std::mem;
169
170 // We take a copy of the core to run on the remote threads
171 let core = Arc::clone(&self.core);
172
173 // The dropper will stop the child threads when the main thread future is dropped
174 struct Dropper {
175 /// The senders to signal when this is dropped
176 stoppers: Vec<oneshot::Sender<()>>,
177
178 /// The join handles for waiting for the threads to shut down
179 join_handles: Vec<JoinHandle<()>>,
180 }
181
182 impl Drop for Dropper {
183 fn drop(&mut self) {
184 // Wake up all the threads and tell them to stop
185 for stopper in self.stoppers.drain(..) {
186 stopper.send(()).ok();
187 }
188
189 // Wait for all the threads to shut down before finishing the drop
190 for join_handle in self.join_handles.drain(..) {
191 join_handle.join().ok();
192 }
193 }
194 }
195
196 async move {
197 // The stoppers are used to signal the subthreads to stop when the future is dropped
198 let mut stoppers: Vec<oneshot::Sender<()>> = vec![];
199 let mut join_handles = vec![];
200
201 for _ in 1..num_threads {
202 // Create the channel used to signal the thread to stop
203 let (send_stop, recv_stop) = oneshot::channel();
204
205 // Create the thread itself
206 let core = Arc::clone(&core);
207 let join_handle = thread::spawn(move || {
208 executor::block_on(async move {
209 // Run the scene until the scene itself stops or the 'stop' event is triggered
210 let scene_runner = run_core(&core);
211
212 future::select(scene_runner, recv_stop.map(|_| ())).await;
213 });
214 });
215
216 // Stopper is signalled when the dropper is dropped, and the join handles are awaited at that time too
217 stoppers.push(send_stop);
218 join_handles.push(join_handle);
219 }
220
221 // The dropper will be dropped when this returned future is done
222 let dropper = Dropper { stoppers, join_handles };
223
224 // Run the scene on this thread as well
225 run_core(&core).await;
226
227 // Dropper will ensure that all the subthreads are shutdown (if we reach here, or if the future is dropped ahead of time)
228 mem::drop(dropper);
229 }
230 }
231}
232
233impl SceneInitialisationContext for Scene {
234 ///
235 /// Connects the output `stream` of the `source` program to the input of `target`
236 ///
237 /// Sub-programs can send messages without needing to know what handles them, for instance by creating an output stream using
238 /// `scene_context.send(())`. This call provides the means to specify how these streams are connected, for example by
239 /// calling `scene.connect_programs((), some_target_program_id, StreamId::with_message_type::<SomeMessageType>())` to connect
240 /// everything that sends `SomeMessageType` to the subprogram with the ID `some_target_program_id`.
241 ///
242 /// The parameters can be used to specify exactly which stream should be redirected: it's possible to redirect only the streams
243 /// originating from a specific subprogram, or even streams that requested a particular target. A filtering mechanism is also
244 /// provided, in case it's necessary to change the type of the message to suit the target.
245 ///
246 /// The target is usually a specific program, but can also be `StreamTarget::None` to indicate that any messages should be
247 /// dropped with no further action. `StreamTarget::Any` is the default, and will result in the stream blocking until another
248 /// call connects it.
249 ///
250 /// The stream ID specifies which of the streams originating from the souce should be connected. This can either be created
251 /// using `StreamId::with_message_type::<SomeMessage>()` to indicate all outgoing streams of that type from `source`, or
252 /// `StreamId::with_message_type::<SomeMessage>().for_target(target)` to indicate an outgoing stream with a specific destination.
253 ///
254 /// Examples:
255 ///
256 /// ```
257 /// # use flo_scene::*;
258 /// # use futures::prelude::*;
259 /// # use serde::*;
260 /// #
261 /// # #[derive(Serialize, Deserialize)]
262 /// # enum ExampleMessage { Test };
263 /// # impl SceneMessage for ExampleMessage { }
264 /// # #[derive(Serialize, Deserialize)]
265 /// # enum FilteredMessage { Test };
266 /// # impl SceneMessage for FilteredMessage { }
267 /// # let scene = Scene::empty();
268 /// # let subprogram = SubProgramId::new();
269 /// # let source_program = SubProgramId::new();
270 /// # let other_program = SubProgramId::new();
271 /// # let example_filter = FilterHandle::for_filter(|input_stream: InputStream<FilteredMessage>| input_stream.map(|_| ExampleMessage::Test));
272 /// #
273 /// // Connect all the 'ExampleMessage' streams to one program
274 /// scene.connect_programs((), &subprogram, StreamId::with_message_type::<ExampleMessage>());
275 ///
276 /// // Direct the messages for the source_program to other_program instead (takes priority over the 'any' example set up above)
277 /// scene.connect_programs(&source_program, &other_program, StreamId::with_message_type::<ExampleMessage>());
278 ///
279 /// // Make 'other_program' throw away its messages
280 /// scene.connect_programs(&other_program, StreamTarget::None, StreamId::with_message_type::<ExampleMessage>());
281 ///
282 /// // When 'source_program' tries to connect directly to 'subprogram', send its output to 'other_program' instead
283 /// scene.connect_programs(&source_program, &other_program, StreamId::with_message_type::<ExampleMessage>().for_target(&subprogram));
284 ///
285 /// // Use a filter to accept a different incoming message type for a target program
286 /// scene.connect_programs((), StreamTarget::Filtered(example_filter.clone(), other_program), StreamId::with_message_type::<FilteredMessage>());
287 /// scene.connect_programs(&example_filter, StreamTarget::Program(other_program), StreamId::with_message_type::<FilteredMessage>());
288 ///
289 /// // Filter any output if it's connected to an input of a specified type
290 /// scene.connect_programs(&example_filter, (), StreamId::with_message_type::<FilteredMessage>().for_target(&subprogram));
291 /// ```
292 ///
293 fn connect_programs(&self, source: impl Into<StreamSource>, target: impl Into<StreamTarget>, stream: impl Into<StreamId>) -> Result<ConnectionResult, ConnectionError> {
294 // Convert the source & target, then pass the request on to the core
295 let source = source.into();
296 let target = target.into();
297 let stream = stream.into();
298
299 SceneCore::connect_programs(&self.core, source, target, stream)
300 }
301
302 ///
303 /// Adds a subprogram to run in this scene
304 ///
305 fn add_subprogram<'a, TProgramFn, TInputMessage, TFuture>(&'a self, program_id: SubProgramId, program: TProgramFn, max_input_waiting: usize)
306 where
307 TFuture: 'static + Send + Future<Output=()>,
308 TInputMessage: 'static + SceneMessage,
309 TProgramFn: 'a + Send + FnOnce(InputStream<TInputMessage>, SceneContext) -> TFuture,
310 {
311 // Create the context and input stream for the program
312 let input_stream = InputStream::new(program_id, &self.core, max_input_waiting);
313 let input_core = input_stream.core();
314
315 // Create the future that will be used to run the future
316 let (send_context, recv_context) = oneshot::channel::<(TFuture, SceneContext)>();
317 let run_program = async move {
318 if let Ok((program, scene_context)) = recv_context.await {
319 // Start the program running
320 pin_mut!(program);
321
322 // Poll the program with the scene context set
323 poll_fn(|context| {
324 with_scene_context(&scene_context, || {
325 program.as_mut().poll(context)
326 })
327 }).await;
328 }
329 };
330
331 // Start the program running
332 let subprogram = SceneCore::start_subprogram(&self.core, program_id, run_program, input_core);
333
334 // Call the start function to create the future, and pass it into the program that was started
335 let context = SceneContext::new(&self.core, &subprogram);
336 let program = with_scene_context(&context, || program(input_stream, context.clone()));
337
338 send_context.send((program, context)).ok();
339 }
340}