use super::core::*;
use super::state::*;
use super::update::*;
use super::super::control::*;
use super::super::controller::*;
use super::super::diff_viewmodel::*;
use desync::*;
use binding::*;
use futures::*;
use futures::task::Task;
use std::mem;
use std::sync::*;
struct UpdateStreamCore {
controller: Arc<Controller>,
state: UiSessionState,
last_update_id: u64,
waiting: Option<Task>
}
pub struct UiUpdateStream {
session_core: Arc<Desync<UiSessionCore>>,
stream_core: Arc<Desync<UpdateStreamCore>>,
pending: Arc<Mutex<Option<Vec<UiUpdate>>>>,
}
impl UiUpdateStream {
pub fn new(controller: Arc<Controller>, core: Arc<Desync<UiSessionCore>>) -> UiUpdateStream {
let session_core = core;
let stream_core = Arc::new(Desync::new(UpdateStreamCore::new(controller)));
let pending = Arc::new(Mutex::new(None));
Self::initialise_core(Arc::clone(&session_core), Arc::clone(&stream_core));
let new_stream = UiUpdateStream {
session_core: session_core,
stream_core: stream_core,
pending: pending
};
new_stream.generate_initial_event();
new_stream
}
fn initialise_core(session_core: Arc<Desync<UiSessionCore>>, stream_core: Arc<Desync<UpdateStreamCore>>) {
session_core.async(move |session_core| {
let ui_binding = session_core.ui_tree();
stream_core.async(move |stream_core| {
stream_core.setup_state(&ui_binding);
})
})
}
fn generate_initial_event(&self) {
let session_core = Arc::clone(&self.session_core);
let stream_core = Arc::clone(&self.stream_core);
let pending = Arc::clone(&self.pending);
session_core.async(move |session_core| {
let update_id = session_core.last_update_id();
let ui_binding = session_core.ui_tree();
stream_core.async(move |stream_core| {
let ui_tree = ui_binding.get();
let initial_ui = stream_core.state.update_ui(&ui_tree);
let initial_viewmodel = UiUpdate::UpdateViewModel(viewmodel_update_controller_tree(&*stream_core.controller));
let mut updates = vec![UiUpdate::Start];
if let Some(initial_ui) = initial_ui { updates.push(initial_ui); }
updates.push(initial_viewmodel);
let mut pending = pending.lock().unwrap();
*pending = Some(updates);
stream_core.last_update_id = update_id;
let mut waiting = None;
mem::swap(&mut waiting, &mut stream_core.waiting);
waiting.map(|waiting| waiting.notify());
});
})
}
}
impl UpdateStreamCore {
pub fn new(controller: Arc<Controller>) -> UpdateStreamCore {
UpdateStreamCore {
controller: controller,
state: UiSessionState::new(),
last_update_id: 0,
waiting: None
}
}
pub fn setup_state(&mut self, ui_binding: &BindRef<Control>) {
self.state.watch_viewmodel(Arc::clone(&self.controller));
self.state.watch_canvases(ui_binding);
}
pub fn finish_update(&mut self, ui_binding: &BindRef<Control>, update_id: u64, pending: Arc<Mutex<Option<Vec<UiUpdate>>>>) {
if update_id == self.last_update_id {
return;
}
let mut pending = pending.lock().unwrap();
if pending.is_some() {
return;
}
if let Some(ref waiting) = self.waiting {
let update = self.state.get_updates(ui_binding);
*pending = Some(update);
self.last_update_id = update_id;
waiting.notify();
}
}
}
impl Stream for UiUpdateStream {
type Item = Vec<UiUpdate>;
type Error = ();
fn poll(&mut self) -> Poll<Option<Vec<UiUpdate>>, Self::Error> {
let mut pending = self.pending.lock().unwrap();
let mut pending_result = None;
mem::swap(&mut pending_result, &mut *pending);
if let Some(pending) = pending_result {
Ok(Async::Ready(Some(pending)))
} else {
let task = task::current();
let pending = Arc::clone(&self.pending);
let session_core = Arc::clone(&self.session_core);
let stream_core = Arc::clone(&self.stream_core);
let update_pending = Arc::clone(&self.pending);
let update_stream_core = Arc::clone(&self.stream_core);
session_core.async(move |session_core| {
stream_core.sync(move |stream_core| {
let mut pending = pending.lock().unwrap();
if pending.is_some() {
task.notify();
} else if session_core.last_update_id() != stream_core.last_update_id {
*pending = Some(stream_core.state.get_updates(&session_core.ui_tree()));
stream_core.last_update_id = session_core.last_update_id();
task.notify();
} else {
stream_core.waiting = Some(task);
let ui_binding = session_core.ui_tree();
session_core.on_next_update(move |session_core| {
let this_update_id = session_core.last_update_id();
update_stream_core.async(move |stream_core| stream_core.finish_update(&ui_binding, this_update_id, update_pending));
});
}
});
});
Ok(Async::NotReady)
}
}
}