Skip to main content

kozan_platform/pipeline/
mod.rs

1//! Window pipeline — per-window view + render thread orchestration.
2//!
3//! Chrome: `RenderWidgetHost` — creates channels, spawns threads, routes events.
4//! All channels created before any thread starts — no chicken-and-egg.
5
6pub mod input_state;
7pub mod render_loop;
8pub mod view_loop;
9
10use std::sync::Arc;
11use std::sync::mpsc;
12use std::thread;
13
14use kozan_core::widget::FrameWidget;
15use kozan_scheduler::{Scheduler, WakeSender};
16
17use crate::context::ViewContext;
18use crate::event::ViewEvent;
19use crate::host::PlatformHost;
20use crate::id::WindowId;
21use crate::renderer::RenderSurface;
22use crate::view_thread::{SpawnError, ViewThreadHandle};
23
24use self::render_loop::{OnSurfaceLost, RenderEvent, RenderLoop};
25
26#[derive(Clone, Copy)]
27pub struct ViewportInfo {
28    pub width: u32,
29    pub height: u32,
30    pub scale_factor: f64,
31    pub refresh_rate_millihertz: Option<u32>,
32}
33
34pub struct PipelineConfig<S> {
35    pub surface: S,
36    pub on_surface_lost: OnSurfaceLost,
37    pub host: Arc<dyn PlatformHost>,
38    pub window_id: WindowId,
39    pub viewport: ViewportInfo,
40}
41
42pub struct RenderThreadHandle {
43    sender: mpsc::Sender<RenderEvent>,
44    join_handle: Option<thread::JoinHandle<()>>,
45}
46
47impl RenderThreadHandle {
48    pub fn send(&self, event: RenderEvent) -> bool {
49        self.sender.send(event).is_ok()
50    }
51
52    pub fn shutdown(&mut self) {
53        let _ = self.sender.send(RenderEvent::Shutdown);
54        if let Some(h) = self.join_handle.take() {
55            let _ = h.join();
56        }
57    }
58}
59
60impl Drop for RenderThreadHandle {
61    fn drop(&mut self) {
62        self.shutdown();
63    }
64}
65
66/// Per-window thread pair.
67pub struct WindowPipeline {
68    render_handle: RenderThreadHandle,
69    view_handle: ViewThreadHandle,
70    render_tx: mpsc::Sender<RenderEvent>,
71}
72
73impl WindowPipeline {
74    pub fn spawn<S, F>(config: PipelineConfig<S>, view_init: F) -> Result<Self, SpawnError>
75    where
76        S: RenderSurface + 'static,
77        F: FnOnce(&ViewContext) + Send + 'static,
78    {
79        let vp = config.viewport;
80        let (render_tx, render_rx) = mpsc::channel();
81        let (view_tx, view_rx) = mpsc::channel();
82
83        let render_join = spawn_render(RenderDeps {
84            surface: config.surface,
85            on_lost: config.on_surface_lost,
86            rx: render_rx,
87            view_tx: view_tx.clone(),
88            viewport: vp,
89        })?;
90
91        let view_handle = spawn_view(
92            ViewDeps {
93                rx: view_rx,
94                tx: view_tx,
95                render_tx: render_tx.clone(),
96                host: config.host,
97                window_id: config.window_id,
98                viewport: vp,
99            },
100            view_init,
101        )?;
102
103        Ok(Self {
104            render_handle: RenderThreadHandle {
105                sender: render_tx.clone(),
106                join_handle: Some(render_join),
107            },
108            view_handle,
109            render_tx,
110        })
111    }
112
113    pub fn send_to_view(&self, event: ViewEvent) -> bool {
114        self.view_handle.send(event)
115    }
116
117    pub fn send_to_render(&self, event: RenderEvent) -> bool {
118        self.render_tx.send(event).is_ok()
119    }
120
121    pub fn shutdown(&mut self) {
122        self.render_handle.shutdown();
123        self.view_handle.shutdown();
124    }
125}
126
127impl Drop for WindowPipeline {
128    fn drop(&mut self) {
129        self.shutdown();
130    }
131}
132
133// ── Render thread ────────────────────────────────────────────
134
135struct RenderDeps<S> {
136    surface: S,
137    on_lost: OnSurfaceLost,
138    rx: mpsc::Receiver<RenderEvent>,
139    view_tx: mpsc::Sender<ViewEvent>,
140    viewport: ViewportInfo,
141}
142
143fn spawn_render<S: RenderSurface + 'static>(
144    deps: RenderDeps<S>,
145) -> Result<thread::JoinHandle<()>, SpawnError> {
146    let vp = deps.viewport;
147    thread::Builder::new()
148        .name("kozan-render".into())
149        .spawn(move || {
150            RenderLoop::new(
151                deps.surface,
152                deps.view_tx,
153                deps.on_lost,
154                vp.width,
155                vp.height,
156                vp.scale_factor,
157            )
158            .run(deps.rx);
159        })
160        .map_err(SpawnError::ThreadSpawn)
161}
162
163// ── View thread ──────────────────────────────────────────────
164
165struct ViewDeps {
166    rx: mpsc::Receiver<ViewEvent>,
167    tx: mpsc::Sender<ViewEvent>,
168    render_tx: mpsc::Sender<RenderEvent>,
169    host: Arc<dyn PlatformHost>,
170    window_id: WindowId,
171    viewport: ViewportInfo,
172}
173
174fn spawn_view<F: FnOnce(&ViewContext) + Send + 'static>(
175    deps: ViewDeps,
176    init: F,
177) -> Result<ViewThreadHandle, SpawnError> {
178    let (ws_tx, ws_rx) = mpsc::sync_channel::<WakeSender>(1);
179    let tx_clone = deps.tx.clone();
180
181    let join = thread::Builder::new()
182        .name("kozan-view".into())
183        .spawn(move || view_main(deps, ws_tx, init))
184        .map_err(SpawnError::ThreadSpawn)?;
185
186    let wake = ws_rx.recv().map_err(|_| SpawnError::SetupFailed)?;
187    Ok(ViewThreadHandle::from_parts(tx_clone, wake, join))
188}
189
190fn view_main<F: FnOnce(&ViewContext)>(
191    deps: ViewDeps,
192    ws_tx: mpsc::SyncSender<WakeSender>,
193    init: F,
194) {
195    let (mut scheduler, wake) = new_scheduler(&deps);
196    if ws_tx.send(wake.clone()).is_err() {
197        return;
198    }
199
200    let mut ctx = new_view_context(&deps, wake);
201    init(&ctx);
202    for future in ctx.take_staged_futures() {
203        scheduler.spawn(future);
204    }
205
206    ctx.invalidate_style();
207    scheduler.set_needs_frame();
208    crate::pipeline::view_loop::run(&mut scheduler, &mut ctx, &deps.rx);
209}
210
211fn new_scheduler(deps: &ViewDeps) -> (Scheduler, WakeSender) {
212    let (mut sched, mut wake) = Scheduler::new();
213
214    if let Some(mhz) = deps.viewport.refresh_rate_millihertz {
215        let budget = std::time::Duration::from_micros(1_000_000_000 / mhz as u64);
216        sched.frame_scheduler_mut().set_frame_budget(budget);
217    }
218
219    let tx = deps.tx.clone();
220    let notify: Arc<dyn Fn() + Send + Sync> = Arc::new(move || {
221        let _ = tx.send(ViewEvent::Paint);
222    });
223    wake.set_notify(Arc::clone(&notify));
224    sched.set_executor_notify(notify);
225
226    (sched, wake)
227}
228
229fn new_view_context(deps: &ViewDeps, wake: WakeSender) -> ViewContext {
230    let mut frame = FrameWidget::new();
231    frame.resize(deps.viewport.width, deps.viewport.height);
232    frame.set_scale_factor(deps.viewport.scale_factor);
233    ViewContext::new(
234        frame,
235        wake,
236        deps.host.clone(),
237        deps.window_id,
238        deps.render_tx.clone(),
239    )
240}