use crate::RenderMode;
use crate::dispatch::Dispatcher;
use crate::dispatch::multi_threaded::cost::{COST_THRESHOLD, estimate_render_task_cost};
use crate::dispatch::multi_threaded::worker::Worker;
use crate::fine::{Fine, FineKernel};
use crate::kurbo::{Affine, BezPath, PathEl, Point, Rect, Stroke};
use crate::peniko::{BlendMode, Fill};
use crate::region::Regions;
use alloc::boxed::Box;
use alloc::sync::Arc;
use alloc::vec;
use alloc::vec::Vec;
use core::fmt::{Debug, Formatter};
use crossbeam_channel::TryRecvError;
use rayon::{ThreadPool, ThreadPoolBuilder};
use std::cell::RefCell;
use std::ops::Range;
use std::sync::atomic::{AtomicBool, AtomicU8, Ordering};
use std::sync::{Barrier, Mutex};
use thread_local::ThreadLocal;
use vello_common::clip::ClipContext;
use vello_common::coarse::{Cmd, MODE_CPU, Wide};
use vello_common::encode::EncodedPaint;
use vello_common::fearless_simd::{Level, Simd, dispatch};
use vello_common::filter_effects::Filter;
use vello_common::mask::Mask;
use vello_common::paint::{ImageResolver, Paint};
use vello_common::render_graph::RenderGraph;
use vello_common::strip::Strip;
use vello_common::strip_generator::{StripGenerator, StripStorage};
mod cost;
mod worker;
type RenderTaskSender = crossbeam_channel::Sender<RenderTask>;
type CoarseTaskSender = ordered_channel::Sender<CoarseTask>;
type CoarseTaskReceiver = ordered_channel::Receiver<CoarseTask>;
pub(crate) struct MultiThreadedDispatcher {
wide: Wide,
clip_context: ClipContext,
thread_pool: ThreadPool,
allocation_group: AllocationGroup,
batch_cost: f32,
task_sender: Option<RenderTaskSender>,
workers: Arc<ThreadLocal<RefCell<Worker>>>,
coarse_task_receiver: Option<CoarseTaskReceiver>,
alpha_storage: MaybePresent<Vec<Vec<u8>>>,
task_idx: u32,
num_threads: u16,
strip_generator: StripGenerator,
strip_storage: StripStorage,
level: Level,
flushed: bool,
allocations: Allocations,
render_graph: RenderGraph,
}
impl MultiThreadedDispatcher {
pub(crate) fn new(width: u16, height: u16, num_threads: u16, level: Level) -> Self {
let wide = Wide::<MODE_CPU>::new(width, height);
let thread_pool = ThreadPoolBuilder::new()
.num_threads(num_threads as usize)
.build()
.unwrap();
let alpha_storage = MaybePresent::new(vec![vec![]; usize::from(num_threads + 1)]);
let workers = Arc::new(ThreadLocal::new());
{
let thread_ids = Arc::new(AtomicU8::new(1));
let workers = workers.clone();
thread_pool.spawn_broadcast(move |_| {
let thread_id = thread_ids.fetch_add(1, Ordering::SeqCst);
let worker = Worker::new(width, height, thread_id, level);
let _ = workers.get_or(|| RefCell::new(worker));
});
}
let task_idx = 0;
let batch_cost = 0.0;
let flushed = false;
let mut dispatcher = Self {
wide,
thread_pool,
allocations: Allocations::default(),
allocation_group: AllocationGroup::default(),
batch_cost,
task_idx,
flushed,
workers,
clip_context: ClipContext::new(),
task_sender: None,
coarse_task_receiver: None,
strip_generator: StripGenerator::new(width, height, level),
strip_storage: StripStorage::default(),
level,
alpha_storage,
num_threads,
render_graph: RenderGraph::new(),
};
dispatcher.init();
dispatcher
}
#[cfg(feature = "f32_pipeline")]
fn rasterize_f32(
&self,
buffer: &mut [u8],
width: u16,
height: u16,
encoded_paints: &[EncodedPaint],
image_resolver: &dyn ImageResolver,
) {
use crate::fine::F32Kernel;
dispatch!(self.level, simd => self.rasterize_with::<_, F32Kernel>(simd, buffer, width, height, encoded_paints, image_resolver));
}
#[cfg(feature = "u8_pipeline")]
fn rasterize_u8(
&self,
buffer: &mut [u8],
width: u16,
height: u16,
encoded_paints: &[EncodedPaint],
image_resolver: &dyn ImageResolver,
) {
use crate::fine::U8Kernel;
dispatch!(self.level, simd => self.rasterize_with::<_, U8Kernel>(simd, buffer, width, height, encoded_paints, image_resolver));
}
fn init(&mut self) {
let (render_task_sender, render_task_receiver) = crossbeam_channel::unbounded();
let (coarse_task_sender, coarse_task_receiver) = ordered_channel::unbounded();
let workers = self.workers.clone();
let alpha_storage = self.alpha_storage.clone();
self.task_sender = Some(render_task_sender);
self.coarse_task_receiver = Some(coarse_task_receiver);
self.thread_pool.spawn_broadcast(move |_| {
let render_task_receiver = render_task_receiver.clone();
let mut coarse_task_sender = coarse_task_sender.clone();
let worker = workers.get().unwrap();
let mut worker = worker.borrow_mut();
let thread_id = worker.thread_id();
alpha_storage
.with_inner(|alphas| worker.init(std::mem::take(&mut alphas[thread_id as usize])));
while let Ok(task) = render_task_receiver.recv() {
worker.run_render_task(task, &mut coarse_task_sender);
}
alpha_storage.with_inner(|alphas| {
alphas[thread_id as usize] = worker.finalize();
});
drop(coarse_task_sender);
});
}
fn register_task(&mut self, task: RenderTaskType) {
self.flushed = false;
if self.task_sender.is_none() {
self.init();
}
let cost = estimate_render_task_cost(&task, &self.allocation_group.path);
self.allocation_group.render_tasks.push(task);
self.batch_cost += cost;
if self.batch_cost > COST_THRESHOLD {
self.flush_tasks();
}
}
fn flush_tasks(&mut self) {
self.send_pending_tasks();
self.batch_cost = 0.0;
}
fn bump_task_idx(&mut self) -> u32 {
let idx = self.task_idx;
self.task_idx += 1;
idx
}
fn send_pending_tasks(&mut self) {
let task_idx = self.bump_task_idx();
let allocation_group =
std::mem::replace(&mut self.allocation_group, self.allocations.get());
let task_sender = self.task_sender.as_mut().unwrap();
let clip_path = self.clip_context.get().map(|c| OwnedClip {
strips: c.strips.into(),
alphas: c.alphas.into(),
});
let task = RenderTask {
idx: task_idx,
clip_path,
allocation_group,
};
task_sender.send(task).unwrap();
self.run_coarse(true, &[]);
}
fn run_coarse(&mut self, abort_empty: bool, encoded_paints: &[EncodedPaint]) {
let result_receiver = self.coarse_task_receiver.as_mut().unwrap();
loop {
match result_receiver.try_recv() {
Ok(mut task) => {
let num_tasks = task.allocation_group.coarse_tasks.len();
for cmd in task.allocation_group.coarse_tasks.drain(0..num_tasks) {
match cmd {
CoarseTaskType::RenderPath {
strips: strip_range,
paint,
blend_mode,
thread_id,
mask,
} => self.wide.generate(
&task.allocation_group.strips
[strip_range.start as usize..strip_range.end as usize],
paint.clone(),
blend_mode,
thread_id,
mask,
encoded_paints,
),
CoarseTaskType::RenderWideCommand {
strips,
blend_mode,
paint,
thread_id,
mask,
} => self.wide.generate(
&strips,
paint.clone(),
blend_mode,
thread_id,
mask,
encoded_paints,
),
CoarseTaskType::PushLayer {
thread_id,
clip_path,
blend_mode,
mask,
opacity,
} => {
let clip_path = clip_path.map(|strip_range| {
&task.allocation_group.strips
[strip_range.start as usize..strip_range.end as usize]
});
self.wide.push_layer(
0,
clip_path,
blend_mode,
mask,
opacity,
None,
Affine::IDENTITY,
&mut self.render_graph,
thread_id,
);
}
CoarseTaskType::PopLayer => self.wide.pop_layer(&mut self.render_graph),
}
}
self.allocations.put(task.allocation_group);
}
Err(e) => match e {
TryRecvError::Empty => {
if abort_empty {
return;
}
}
TryRecvError::Disconnected => return,
},
}
}
}
fn rasterize_with<S: Simd, F: FineKernel<S>>(
&self,
simd: S,
buffer: &mut [u8],
width: u16,
height: u16,
encoded_paints: &[EncodedPaint],
image_resolver: &dyn ImageResolver,
) {
let mut buffer = Regions::new(width, height, buffer);
let fines = ThreadLocal::new();
let wide = &self.wide;
let alpha_slots = self.alpha_storage.take();
self.thread_pool.install(|| {
buffer.update_regions_par(|region| {
let x = region.x;
let y = region.y;
let mut fine = fines
.get_or(|| RefCell::new(Fine::<S, F>::new(simd)))
.borrow_mut();
let wtile = wide.get(x, y);
fine.set_coords(x, y);
fine.clear(wtile.bg);
for cmd in &wtile.cmds {
let thread_idx = match cmd {
Cmd::AlphaFill(a) => Some(wide.attrs.fill[a.attrs_idx as usize].thread_idx),
Cmd::ClipStrip(a) => Some(wide.attrs.clip[a.attrs_idx as usize].thread_idx),
_ => None,
};
let alphas = thread_idx
.map(|i| alpha_slots[i as usize].as_slice())
.unwrap_or(&[]);
fine.run_cmd(cmd, alphas, encoded_paints, image_resolver, &wide.attrs);
}
fine.pack(region);
});
});
self.alpha_storage.init(alpha_slots);
}
}
impl Dispatcher for MultiThreadedDispatcher {
fn wide(&self) -> &Wide {
&self.wide
}
fn fill_path(
&mut self,
path: &BezPath,
fill_rule: Fill,
transform: Affine,
paint: Paint,
blend_mode: BlendMode,
aliasing_threshold: Option<u8>,
mask: Option<Mask>,
_encoded_paints: &[EncodedPaint],
) {
let start = self.allocation_group.path.len() as u32;
self.allocation_group.path.extend(path);
let end = self.allocation_group.path.len() as u32;
self.register_task(RenderTaskType::FillPath {
path_range: start..end,
transform,
paint,
fill_rule,
blend_mode,
aliasing_threshold,
mask,
});
}
fn stroke_path(
&mut self,
path: &BezPath,
stroke: &Stroke,
transform: Affine,
paint: Paint,
blend_mode: BlendMode,
aliasing_threshold: Option<u8>,
mask: Option<Mask>,
_encoded_paints: &[EncodedPaint],
) {
let start = self.allocation_group.path.len() as u32;
self.allocation_group.path.extend(path);
let end = self.allocation_group.path.len() as u32;
self.register_task(RenderTaskType::StrokePath {
path_range: start..end,
transform,
paint,
stroke: stroke.clone(),
blend_mode,
aliasing_threshold,
mask,
});
}
fn fill_rect_fast(
&mut self,
rect: &Rect,
paint: Paint,
blend_mode: BlendMode,
mask: Option<Mask>,
_encoded_paints: &[EncodedPaint],
) {
let start = self.allocation_group.path.len() as u32;
self.allocation_group.path.extend([
PathEl::MoveTo(Point::new(rect.x0, rect.y0)),
PathEl::LineTo(Point::new(rect.x1, rect.y0)),
PathEl::LineTo(Point::new(rect.x1, rect.y1)),
PathEl::LineTo(Point::new(rect.x0, rect.y1)),
PathEl::ClosePath,
]);
let end = self.allocation_group.path.len() as u32;
self.register_task(RenderTaskType::FillPath {
path_range: start..end,
transform: Affine::IDENTITY,
paint,
fill_rule: Fill::NonZero,
blend_mode,
aliasing_threshold: None,
mask,
});
}
fn push_layer(
&mut self,
clip_path: Option<&BezPath>,
fill_rule: Fill,
clip_transform: Affine,
blend_mode: BlendMode,
opacity: f32,
aliasing_threshold: Option<u8>,
mask: Option<Mask>,
filter: Option<Filter>,
) {
if filter.is_some() {
unimplemented!("Filter effects are not yet supported in multi-threaded rendering");
}
let mapped_clip = clip_path.map(|c| {
let start = self.allocation_group.path.len() as u32;
self.allocation_group.path.extend(c);
let end = self.allocation_group.path.len() as u32;
(start..end, clip_transform)
});
self.register_task(RenderTaskType::PushLayer {
clip_path: mapped_clip,
blend_mode,
opacity,
mask,
fill_rule,
aliasing_threshold,
});
}
fn pop_layer(&mut self) {
self.register_task(RenderTaskType::PopLayer);
}
fn reset(&mut self) {
self.wide.reset();
self.clip_context.reset();
self.allocation_group.clear();
self.batch_cost = 0.0;
self.task_idx = 0;
self.flushed = false;
self.task_sender = None;
self.coarse_task_receiver = None;
self.strip_generator.reset();
self.strip_storage.clear();
self.alpha_storage.with_inner(|alphas| {
for alpha in alphas {
alpha.clear();
}
});
let workers = self.workers.clone();
let barrier = Arc::new(Barrier::new(usize::from(self.num_threads) + 1));
let t_barrier = barrier.clone();
self.thread_pool.spawn_broadcast(move |_| {
let worker = workers.get().unwrap();
let mut borrowed = worker.borrow_mut();
borrowed.reset();
t_barrier.wait();
});
barrier.wait();
self.init();
}
fn flush(&mut self, encoded_paints: &[EncodedPaint]) {
if self.flushed {
return;
}
self.flush_tasks();
let sender = core::mem::take(&mut self.task_sender);
drop(sender);
self.run_coarse(false, encoded_paints);
self.alpha_storage.with_inner(|alphas| {
alphas[0] = std::mem::take(&mut self.strip_storage.alphas);
});
self.flushed = true;
}
fn rasterize(
&self,
buffer: &mut [u8],
render_mode: RenderMode,
width: u16,
height: u16,
encoded_paints: &[EncodedPaint],
image_resolver: &dyn ImageResolver,
) {
assert!(self.flushed, "attempted to rasterize before flushing");
#[cfg(all(feature = "u8_pipeline", not(feature = "f32_pipeline")))]
{
let _ = render_mode;
self.rasterize_u8(buffer, width, height, encoded_paints, image_resolver);
}
#[cfg(all(feature = "f32_pipeline", not(feature = "u8_pipeline")))]
{
let _ = render_mode;
self.rasterize_f32(buffer, width, height, encoded_paints, image_resolver);
}
#[cfg(all(feature = "f32_pipeline", feature = "u8_pipeline"))]
match render_mode {
RenderMode::OptimizeSpeed => {
self.rasterize_u8(buffer, width, height, encoded_paints, image_resolver);
}
RenderMode::OptimizeQuality => {
self.rasterize_f32(buffer, width, height, encoded_paints, image_resolver);
}
}
}
fn composite_at_offset(
&self,
_buffer: &mut [u8],
_width: u16,
_height: u16,
_dst_x: u16,
_dst_y: u16,
_dst_buffer_width: u16,
_dst_buffer_height: u16,
_render_mode: RenderMode,
_encoded_paints: &[EncodedPaint],
_image_resolver: &dyn ImageResolver,
) {
unimplemented!("composite_at_offset is not implemented for multi-threaded dispatcher");
}
fn generate_wide_cmd(
&mut self,
strip_buf: &[Strip],
paint: Paint,
blend_mode: BlendMode,
_encoded_paints: &[EncodedPaint],
) {
self.register_task(RenderTaskType::WideCommand {
strip_buf: strip_buf.into(),
thread_idx: 0,
paint,
blend_mode,
});
}
fn strip_storage_mut(&mut self) -> &mut StripStorage {
&mut self.strip_storage
}
fn push_clip_path(
&mut self,
path: &BezPath,
fill_rule: Fill,
transform: Affine,
aliasing_threshold: Option<u8>,
) {
self.flush_tasks();
self.clip_context.push_clip(
path,
&mut self.strip_generator,
fill_rule,
transform,
aliasing_threshold,
);
}
fn pop_clip_path(&mut self) {
self.flush_tasks();
self.clip_context.pop_clip();
}
}
impl Debug for MultiThreadedDispatcher {
fn fmt(&self, f: &mut Formatter<'_>) -> core::fmt::Result {
f.write_str("MultiThreadedDispatcher { .. }")
}
}
#[derive(Debug)]
pub(crate) struct OwnedClip {
strips: Box<[Strip]>,
alphas: Box<[u8]>,
}
struct AllocationManager<T> {
entries: Vec<Vec<T>>,
}
impl<T> AllocationManager<T> {
fn get(&mut self) -> Vec<T> {
self.entries.pop().unwrap_or_default()
}
fn put(&mut self, mut allocation: Vec<T>) {
allocation.clear();
self.entries.push(allocation);
}
}
impl<T> Default for AllocationManager<T> {
fn default() -> Self {
Self { entries: vec![] }
}
}
#[derive(Default)]
struct Allocations {
render_tasks: AllocationManager<RenderTaskType>,
paths: AllocationManager<PathEl>,
strips: AllocationManager<Strip>,
coarse_tasks: AllocationManager<CoarseTaskType>,
}
impl Allocations {
fn get(&mut self) -> AllocationGroup {
let render_tasks = self.render_tasks.get();
let path = self.paths.get();
let strips = self.strips.get();
let coarse_tasks = self.coarse_tasks.get();
AllocationGroup {
path,
render_tasks,
coarse_tasks,
strips,
}
}
fn put(&mut self, allocation: AllocationGroup) {
self.render_tasks.put(allocation.render_tasks);
self.paths.put(allocation.path);
self.strips.put(allocation.strips);
self.coarse_tasks.put(allocation.coarse_tasks);
}
}
#[derive(Default, Debug)]
pub(crate) struct AllocationGroup {
pub(crate) path: Vec<PathEl>,
pub(crate) render_tasks: Vec<RenderTaskType>,
pub(crate) strips: Vec<Strip>,
pub(crate) coarse_tasks: Vec<CoarseTaskType>,
}
impl AllocationGroup {
fn clear(&mut self) {
self.path.clear();
self.render_tasks.clear();
self.strips.clear();
self.coarse_tasks.clear();
}
}
#[derive(Debug)]
pub(crate) struct RenderTask {
pub(crate) idx: u32,
pub(crate) clip_path: Option<OwnedClip>,
pub(crate) allocation_group: AllocationGroup,
}
#[derive(Debug, Clone)]
pub(crate) enum RenderTaskType {
FillPath {
path_range: Range<u32>,
transform: Affine,
paint: Paint,
fill_rule: Fill,
blend_mode: BlendMode,
aliasing_threshold: Option<u8>,
mask: Option<Mask>,
},
WideCommand {
strip_buf: Box<[Strip]>,
thread_idx: u8,
paint: Paint,
blend_mode: BlendMode,
},
StrokePath {
path_range: Range<u32>,
transform: Affine,
paint: Paint,
stroke: Stroke,
blend_mode: BlendMode,
aliasing_threshold: Option<u8>,
mask: Option<Mask>,
},
PushLayer {
clip_path: Option<(Range<u32>, Affine)>,
blend_mode: BlendMode,
opacity: f32,
mask: Option<Mask>,
fill_rule: Fill,
aliasing_threshold: Option<u8>,
},
PopLayer,
}
pub(crate) struct CoarseTask {
allocation_group: AllocationGroup,
}
#[derive(Debug)]
pub(crate) enum CoarseTaskType {
RenderPath {
thread_id: u8,
strips: Range<u32>,
blend_mode: BlendMode,
paint: Paint,
mask: Option<Mask>,
},
RenderWideCommand {
thread_id: u8,
strips: Box<[Strip]>,
paint: Paint,
blend_mode: BlendMode,
mask: Option<Mask>,
},
PushLayer {
thread_id: u8,
clip_path: Option<Range<u32>>,
blend_mode: BlendMode,
mask: Option<Mask>,
opacity: f32,
},
PopLayer,
}
#[derive(Clone)]
pub(crate) struct MaybePresent<T: Default> {
present: Arc<AtomicBool>,
value: Arc<Mutex<T>>,
}
impl<T: Default> MaybePresent<T> {
pub(crate) fn new(val: T) -> Self {
Self {
present: Arc::new(AtomicBool::new(true)),
value: Arc::new(Mutex::new(val)),
}
}
pub(crate) fn init(&self, value: T) {
let mut locked = self.value.lock().unwrap();
*locked = value;
self.present.store(true, Ordering::SeqCst);
}
pub(crate) fn with_inner(&self, mut func: impl FnMut(&mut T)) {
assert!(
self.present.load(Ordering::SeqCst),
"Tried to access `MaybePresent` before initialization."
);
let mut lock = self.value.lock().unwrap();
func(&mut lock);
}
pub(crate) fn take(&self) -> T {
assert!(
self.present.load(Ordering::SeqCst),
"Tried to access `MaybePresent` before initialization."
);
let mut locked = self.value.lock().unwrap();
self.present.store(false, Ordering::SeqCst);
std::mem::take(&mut *locked)
}
}
#[cfg(test)]
mod tests {
use crate::Level;
use crate::color::palette::css::BLUE;
use crate::dispatch::Dispatcher;
use crate::dispatch::multi_threaded::MultiThreadedDispatcher;
use crate::kurbo::{Affine, Rect, Shape};
use crate::peniko::{BlendMode, Fill};
use vello_common::paint::{Paint, PremulColor};
#[test]
fn allocations() {
let mut dispatcher = MultiThreadedDispatcher::new(100, 100, 4, Level::new());
for _ in 0..20 {
dispatcher.fill_path(
&Rect::new(0.0, 0.0, 50.0, 50.0).to_path(0.1),
Fill::NonZero,
Affine::IDENTITY,
Paint::Solid(PremulColor::from_alpha_color(BLUE)),
BlendMode::default(),
None,
None,
&[],
);
dispatcher.flush(&[]);
}
assert_eq!(dispatcher.allocations.paths.entries.len(), 1);
assert_eq!(dispatcher.allocations.strips.entries.len(), 1);
assert_eq!(dispatcher.allocations.render_tasks.entries.len(), 1);
assert_eq!(dispatcher.allocations.coarse_tasks.entries.len(), 1);
}
}