flo_scene/host/
scene.rs

1use crate::host::connect_result::*;
2use crate::host::initialisation_context::*;
3use crate::host::input_stream::*;
4use crate::host::output_sink::*;
5use crate::host::scene_context::*;
6use crate::host::scene_core::*;
7use crate::host::scene_message::*;
8use crate::host::stream_id::*;
9use crate::host::stream_source::*;
10use crate::host::stream_target::*;
11use crate::host::subprogram_id::*;
12use crate::host::error::*;
13use crate::host::programs::*;
14
15use futures::prelude::*;
16use futures::channel::oneshot;
17use futures::future::{poll_fn};
18use futures::{pin_mut};
19
20use std::io::{stdin, stdout, stderr, BufReader};
21use std::sync::*;
22use std::collections::{HashSet};
23
24///
25/// A scene represents a set of running co-programs, creating a larger self-contained piece of
26/// software out of a set of smaller pieces of software that communicate via streams.
27///
28#[derive(Clone)]
29pub struct Scene {
30    core: Arc<Mutex<SceneCore>>,
31}
32
33impl Default for Scene {
34    fn default() -> Self {
35        Scene::with_standard_programs([
36            *SCENE_CONTROL_PROGRAM,
37            *OUTSIDE_SCENE_PROGRAM,
38            *STDIN_PROGRAM,
39            *STDOUT_PROGRAM,
40            *STDERR_PROGRAM,
41            *IDLE_NOTIFICATION_PROGRAM,
42            *TIMER_PROGRAM,
43        ])
44    }
45}
46
47impl Scene {
48    ///
49    /// Creates an empty scene (this has no control program so it won't start or connect any programs by default)
50    ///
51    pub fn empty() -> Self {
52        Scene {
53            core: Arc::new(Mutex::new(SceneCore::new()))
54        }
55    }
56
57    ///
58    /// Creates a new scene with a set of programs from the default set started
59    ///
60    /// For example, calling this as `Scene::with_standard_programs([*SCENE_CONTROL_PROGRAM])` will create a scene with only
61    /// the standard scene control program running.
62    ///
63    pub fn with_standard_programs(programs: impl IntoIterator<Item=SubProgramId>) -> Self {
64        let scene       = Self::empty();
65        let programs    = programs.into_iter().collect::<HashSet<_>>();
66
67        if programs.contains(&*SCENE_CONTROL_PROGRAM) {
68            let control_updates = SceneCore::send_updates_to_stream(&scene.core, *SCENE_CONTROL_PROGRAM);
69
70            scene.add_subprogram(*SCENE_CONTROL_PROGRAM, move |input, context| SceneControl::scene_control_program(input, context, control_updates), 0);
71            scene.connect_programs((), *SCENE_CONTROL_PROGRAM, StreamId::with_message_type::<Subscribe<SceneUpdate>>()).unwrap();
72            scene.connect_programs((), *SCENE_CONTROL_PROGRAM, StreamId::with_message_type::<Query<SceneUpdate>>()).unwrap();
73        }
74        if programs.contains(&*OUTSIDE_SCENE_PROGRAM)       { scene.add_subprogram(*OUTSIDE_SCENE_PROGRAM, outside_scene_program, 0); }
75
76        if programs.contains(&*STDIN_PROGRAM)               { scene.add_subprogram(*STDIN_PROGRAM, |input, context| text_input_subprogram(BufReader::new(stdin()), input, context), 0); }
77        if programs.contains(&*STDOUT_PROGRAM)              { scene.add_subprogram(*STDOUT_PROGRAM, |input, context| text_io_subprogram(stdout(), input, context), 0); }
78        if programs.contains(&*STDERR_PROGRAM)              { scene.add_subprogram(*STDERR_PROGRAM, |input, context| text_io_subprogram(stderr(), input, context), 0); }
79        if programs.contains(&*IDLE_NOTIFICATION_PROGRAM)   { scene.add_subprogram(*IDLE_NOTIFICATION_PROGRAM, idle_subprogram, 20); }
80        if programs.contains(&*TIMER_PROGRAM)               { scene.add_subprogram(*TIMER_PROGRAM, timer_subprogram, 0); }
81
82        scene
83    }
84
85    ///
86    /// Creates a duplicate scene object
87    ///
88    pub (crate) fn with_core(core: &Arc<Mutex<SceneCore>>) -> Self {
89        Scene {
90            core: core.clone()
91        }
92    }
93
94    ///
95    /// Gets a reference to the core of this scene
96    ///
97    #[inline]
98    pub (crate) fn core(&self) -> &Arc<Mutex<SceneCore>> {
99        &self.core
100    }
101
102    ///
103    /// Creates a stream that can be used to send messages into this scene from elsewhere
104    ///
105    /// This scene must have a `OUTSIDE_SCENE_PROGRAM` running in order to act as a source for these messages (and this can also be used to
106    /// connect or reconnect the streams returned by this function) .
107    ///
108    pub fn send_to_scene<TMessage>(&self, target: impl Into<StreamTarget>) -> Result<impl Sink<TMessage, Error=SceneSendError<TMessage>>, ConnectionError> 
109    where
110        TMessage: 'static + SceneMessage,
111    {
112        let target = target.into();
113
114        SceneCore::initialise_message_type(&self.core, StreamId::with_message_type::<TMessage>());
115
116        // Fetch the outside scene program, which is the source for messages on this stream
117        let program_id      = *OUTSIDE_SCENE_PROGRAM;
118        let program_core    = self.core.lock().unwrap().get_sub_program(program_id).ok_or(ConnectionError::NoOutsideSceneSubProgram)?;
119        let stream_id       = StreamId::with_message_type::<TMessage>().for_target(target.clone());
120
121        // Try to re-use an existing target
122        let existing_core = program_core.lock().unwrap().output_core(&stream_id);
123
124        if let Some(existing_core) = existing_core {
125            // Reattach to the existing output core
126            let output_sink = OutputSink::attach(program_id, existing_core, &self.core);
127            Ok(output_sink)
128        } else {
129            // Create a new target for this message
130            let sink_target = SceneCore::sink_for_target::<TMessage>(&self.core, &program_id, target)?;
131
132            // Try to attach it to the program (or just read the old version)
133            let new_or_old_target = program_core.lock().unwrap().try_create_output_target(&stream_id, sink_target);
134            let new_or_old_target = match new_or_old_target { Ok(new) => new, Err(old) => old };
135
136            // Report the new connection
137            let target_program  = OutputSinkCore::target_program_id(&new_or_old_target);
138            let update          = if let Some(target_program) = target_program {
139                SceneUpdate::Connected(program_id, target_program, stream_id)
140            } else {
141                SceneUpdate::Disconnected(program_id, stream_id)
142            };
143
144            SceneCore::send_scene_updates(&self.core, vec![update]);
145
146            // Create an output sink from the target
147            let output_sink = OutputSink::attach(program_id, new_or_old_target, &self.core);
148            Ok(output_sink)
149        }
150    }
151
152    ///
153    /// Returns a future that will run any waiting programs on the current thread
154    ///
155    pub fn run_scene(&self) -> impl Future<Output=()> {
156        run_core(&self.core)
157    }
158
159    ///
160    /// Returns a future that will run the scene across `num_threads` threads (including the thread this is awaited from)
161    ///
162    /// The subthreads will end when the scene is ended, or the returned future is dropped.
163    ///
164    pub fn run_scene_with_threads(&self, num_threads: usize) -> impl Future<Output=()> {
165        use futures::executor;
166        use std::thread::{JoinHandle};
167        use std::thread;
168        use std::mem;
169
170        // We take a copy of the core to run on the remote threads
171        let core = Arc::clone(&self.core);
172
173        // The dropper will stop the child threads when the main thread future is dropped
174        struct Dropper {
175            /// The senders to signal when this is dropped
176            stoppers: Vec<oneshot::Sender<()>>,
177
178            /// The join handles for waiting for the threads to shut down
179            join_handles: Vec<JoinHandle<()>>,
180        }
181
182        impl Drop for Dropper {
183            fn drop(&mut self) {
184                // Wake up all the threads and tell them to stop
185                for stopper in self.stoppers.drain(..) {
186                    stopper.send(()).ok();
187                }
188
189                // Wait for all the threads to shut down before finishing the drop
190                for join_handle in self.join_handles.drain(..) {
191                    join_handle.join().ok();
192                }
193            }
194        }
195
196        async move {
197            // The stoppers are used to signal the subthreads to stop when the future is dropped
198            let mut stoppers: Vec<oneshot::Sender<()>>  = vec![];
199            let mut join_handles                        = vec![];
200
201            for _ in 1..num_threads {
202                // Create the channel used to signal the thread to stop
203                let (send_stop, recv_stop) = oneshot::channel();
204
205                // Create the thread itself
206                let core        = Arc::clone(&core);
207                let join_handle = thread::spawn(move || {
208                    executor::block_on(async move {
209                        // Run the scene until the scene itself stops or the 'stop' event is triggered
210                        let scene_runner = run_core(&core);
211
212                        future::select(scene_runner, recv_stop.map(|_| ())).await;
213                    });
214                });
215
216                // Stopper is signalled when the dropper is dropped, and the join handles are awaited at that time too
217                stoppers.push(send_stop);
218                join_handles.push(join_handle);
219            }
220
221            // The dropper will be dropped when this returned future is done
222            let dropper = Dropper { stoppers, join_handles };
223
224            // Run the scene on this thread as well
225            run_core(&core).await;
226
227            // Dropper will ensure that all the subthreads are shutdown (if we reach here, or if the future is dropped ahead of time)
228            mem::drop(dropper);
229        }
230    }
231}
232
233impl SceneInitialisationContext for Scene {
234    ///
235    /// Connects the output `stream` of the `source` program to the input of `target`
236    ///
237    /// Sub-programs can send messages without needing to know what handles them, for instance by creating an output stream using
238    /// `scene_context.send(())`. This call provides the means to specify how these streams are connected, for example by
239    /// calling `scene.connect_programs((), some_target_program_id, StreamId::with_message_type::<SomeMessageType>())` to connect
240    /// everything that sends `SomeMessageType` to the subprogram with the ID `some_target_program_id`.
241    ///
242    /// The parameters can be used to specify exactly which stream should be redirected: it's possible to redirect only the streams
243    /// originating from a specific subprogram, or even streams that requested a particular target. A filtering mechanism is also
244    /// provided, in case it's necessary to change the type of the message to suit the target.
245    ///
246    /// The target is usually a specific program, but can also be `StreamTarget::None` to indicate that any messages should be
247    /// dropped with no further action. `StreamTarget::Any` is the default, and will result in the stream blocking until another
248    /// call connects it.
249    ///
250    /// The stream ID specifies which of the streams originating from the souce should be connected. This can either be created
251    /// using `StreamId::with_message_type::<SomeMessage>()` to indicate all outgoing streams of that type from `source`, or 
252    /// `StreamId::with_message_type::<SomeMessage>().for_target(target)` to indicate an outgoing stream with a specific destination.
253    ///
254    /// Examples:
255    ///
256    /// ```
257    /// #   use flo_scene::*;
258    /// #   use futures::prelude::*;
259    /// #   use serde::*;
260    /// #
261    /// #   #[derive(Serialize, Deserialize)]
262    /// #   enum ExampleMessage { Test };
263    /// #   impl SceneMessage for ExampleMessage { }
264    /// #   #[derive(Serialize, Deserialize)]
265    /// #   enum FilteredMessage { Test };
266    /// #   impl SceneMessage for FilteredMessage { }
267    /// #   let scene           = Scene::empty();
268    /// #   let subprogram      = SubProgramId::new();
269    /// #   let source_program  = SubProgramId::new();
270    /// #   let other_program   = SubProgramId::new();
271    /// #   let example_filter  = FilterHandle::for_filter(|input_stream: InputStream<FilteredMessage>| input_stream.map(|_| ExampleMessage::Test));
272    /// #
273    /// // Connect all the 'ExampleMessage' streams to one program
274    /// scene.connect_programs((), &subprogram, StreamId::with_message_type::<ExampleMessage>());
275    /// 
276    /// // Direct the messages for the source_program to other_program instead (takes priority over the 'any' example set up above)
277    /// scene.connect_programs(&source_program, &other_program, StreamId::with_message_type::<ExampleMessage>());
278    ///
279    /// // Make 'other_program' throw away its messages
280    /// scene.connect_programs(&other_program, StreamTarget::None, StreamId::with_message_type::<ExampleMessage>());
281    ///
282    /// // When 'source_program' tries to connect directly to 'subprogram', send its output to 'other_program' instead
283    /// scene.connect_programs(&source_program, &other_program, StreamId::with_message_type::<ExampleMessage>().for_target(&subprogram));
284    ///
285    /// // Use a filter to accept a different incoming message type for a target program
286    /// scene.connect_programs((), StreamTarget::Filtered(example_filter.clone(), other_program), StreamId::with_message_type::<FilteredMessage>());
287    /// scene.connect_programs(&example_filter, StreamTarget::Program(other_program), StreamId::with_message_type::<FilteredMessage>());
288    ///
289    /// // Filter any output if it's connected to an input of a specified type
290    /// scene.connect_programs(&example_filter, (), StreamId::with_message_type::<FilteredMessage>().for_target(&subprogram));
291    /// ```
292    ///
293    fn connect_programs(&self, source: impl Into<StreamSource>, target: impl Into<StreamTarget>, stream: impl Into<StreamId>) -> Result<ConnectionResult, ConnectionError> {
294        // Convert the source & target, then pass the request on to the core
295        let source = source.into();
296        let target = target.into();
297        let stream = stream.into();
298
299        SceneCore::connect_programs(&self.core, source, target, stream)
300    }
301
302    ///
303    /// Adds a subprogram to run in this scene
304    ///
305    fn add_subprogram<'a, TProgramFn, TInputMessage, TFuture>(&'a self, program_id: SubProgramId, program: TProgramFn, max_input_waiting: usize)
306    where
307        TFuture:        'static + Send + Future<Output=()>,
308        TInputMessage:  'static + SceneMessage,
309        TProgramFn:     'a + Send + FnOnce(InputStream<TInputMessage>, SceneContext) -> TFuture,
310    {
311        // Create the context and input stream for the program
312        let input_stream    = InputStream::new(program_id, &self.core, max_input_waiting);
313        let input_core      = input_stream.core();
314
315        // Create the future that will be used to run the future
316        let (send_context, recv_context) = oneshot::channel::<(TFuture, SceneContext)>();
317        let run_program = async move {
318            if let Ok((program, scene_context)) = recv_context.await {
319                // Start the program running
320                pin_mut!(program);
321
322                // Poll the program with the scene context set
323                poll_fn(|context| {
324                    with_scene_context(&scene_context, || {
325                        program.as_mut().poll(context)
326                    })
327                }).await;
328            }
329        };
330
331        // Start the program running
332        let subprogram = SceneCore::start_subprogram(&self.core, program_id, run_program, input_core);
333
334        // Call the start function to create the future, and pass it into the program that was started
335        let context = SceneContext::new(&self.core, &subprogram);
336        let program = with_scene_context(&context, || program(input_stream, context.clone()));
337
338        send_context.send((program, context)).ok();
339    }
340}