1use edgefirst_decoder::{DetectBox, ProtoData, Segmentation};
5use std::ptr::NonNull;
6use std::thread::JoinHandle;
7use tokio::sync::mpsc::{Sender, WeakSender};
8
9use super::processor::GLProcessorST;
10use super::shaders::check_gl_error;
11use super::{EglDisplayKind, Int8InterpolationMode, TransferBackend};
12use crate::{Crop, Error, Flip, ImageProcessorTrait, MaskRegion, Rotation};
13use edgefirst_tensor::TensorDyn;
14
15#[allow(clippy::type_complexity)]
16enum GLProcessorMessage {
17 ImageConvert(
18 SendablePtr<TensorDyn>,
19 SendablePtr<TensorDyn>,
20 Rotation,
21 Flip,
22 Crop,
23 tokio::sync::oneshot::Sender<Result<(), Error>>,
24 ),
25 SetColors(
26 Vec<[u8; 4]>,
27 tokio::sync::oneshot::Sender<Result<(), Error>>,
28 ),
29 DrawMasks(
30 SendablePtr<TensorDyn>,
31 SendablePtr<DetectBox>,
32 SendablePtr<Segmentation>,
33 tokio::sync::oneshot::Sender<Result<(), Error>>,
34 ),
35 DrawMasksProto(
36 SendablePtr<TensorDyn>,
37 SendablePtr<DetectBox>,
38 Box<ProtoData>,
39 tokio::sync::oneshot::Sender<Result<(), Error>>,
40 ),
41 SetInt8Interpolation(
42 Int8InterpolationMode,
43 tokio::sync::oneshot::Sender<Result<(), Error>>,
44 ),
45 DecodeMasksAtlas(
46 SendablePtr<DetectBox>,
47 Box<ProtoData>,
48 usize, usize, tokio::sync::oneshot::Sender<Result<(Vec<u8>, Vec<MaskRegion>), Error>>,
51 ),
52 PboCreate(
53 usize, tokio::sync::oneshot::Sender<Result<u32, Error>>,
55 ),
56 PboMap(
57 u32, usize, tokio::sync::oneshot::Sender<Result<edgefirst_tensor::PboMapping, Error>>,
60 ),
61 PboUnmap(
62 u32, tokio::sync::oneshot::Sender<Result<(), Error>>,
64 ),
65 PboDelete(u32), }
67
68struct GlPboOps {
75 sender: WeakSender<GLProcessorMessage>,
76}
77
78unsafe impl edgefirst_tensor::PboOps for GlPboOps {
83 fn map_buffer(
84 &self,
85 buffer_id: u32,
86 size: usize,
87 ) -> edgefirst_tensor::Result<edgefirst_tensor::PboMapping> {
88 let sender = self
89 .sender
90 .upgrade()
91 .ok_or(edgefirst_tensor::Error::PboDisconnected)?;
92 let (tx, rx) = tokio::sync::oneshot::channel();
93 sender
94 .blocking_send(GLProcessorMessage::PboMap(buffer_id, size, tx))
95 .map_err(|_| edgefirst_tensor::Error::PboDisconnected)?;
96 rx.blocking_recv()
97 .map_err(|_| edgefirst_tensor::Error::PboDisconnected)?
98 .map_err(|e| {
99 edgefirst_tensor::Error::NotImplemented(format!("GL PBO map failed: {e:?}"))
100 })
101 }
102
103 fn unmap_buffer(&self, buffer_id: u32) -> edgefirst_tensor::Result<()> {
104 let sender = self
105 .sender
106 .upgrade()
107 .ok_or(edgefirst_tensor::Error::PboDisconnected)?;
108 let (tx, rx) = tokio::sync::oneshot::channel();
109 sender
110 .blocking_send(GLProcessorMessage::PboUnmap(buffer_id, tx))
111 .map_err(|_| edgefirst_tensor::Error::PboDisconnected)?;
112 rx.blocking_recv()
113 .map_err(|_| edgefirst_tensor::Error::PboDisconnected)?
114 .map_err(|e| {
115 edgefirst_tensor::Error::NotImplemented(format!("GL PBO unmap failed: {e:?}"))
116 })
117 }
118
119 fn delete_buffer(&self, buffer_id: u32) {
120 if let Some(sender) = self.sender.upgrade() {
121 let _ = sender.blocking_send(GLProcessorMessage::PboDelete(buffer_id));
122 }
123 }
124}
125
126#[derive(Debug)]
131pub struct GLProcessorThreaded {
132 handle: Option<JoinHandle<()>>,
134
135 sender: Option<Sender<GLProcessorMessage>>,
137 transfer_backend: TransferBackend,
138}
139
140unsafe impl Send for GLProcessorThreaded {}
141unsafe impl Sync for GLProcessorThreaded {}
142
143struct SendablePtr<T: Send> {
144 ptr: NonNull<T>,
145 len: usize,
146}
147
148unsafe impl<T> Send for SendablePtr<T> where T: Send {}
149
150impl GLProcessorThreaded {
151 pub fn new(kind: Option<EglDisplayKind>) -> Result<Self, Error> {
153 let (send, mut recv) = tokio::sync::mpsc::channel::<GLProcessorMessage>(1);
154
155 let (create_ctx_send, create_ctx_recv) = tokio::sync::oneshot::channel();
156
157 let func = move || {
158 let mut gl_converter = match GLProcessorST::new(kind) {
159 Ok(gl) => gl,
160 Err(e) => {
161 let _ = create_ctx_send.send(Err(e));
162 return;
163 }
164 };
165 let _ = create_ctx_send.send(Ok(gl_converter.gl_context.transfer_backend));
166 while let Some(msg) = recv.blocking_recv() {
167 match msg {
168 GLProcessorMessage::ImageConvert(src, mut dst, rotation, flip, crop, resp) => {
169 let src = unsafe { src.ptr.as_ref() };
172 let dst = unsafe { dst.ptr.as_mut() };
173 let res = gl_converter.convert(src, dst, rotation, flip, crop);
174 let _ = resp.send(res);
175 }
176 GLProcessorMessage::DrawMasks(mut dst, det, seg, resp) => {
177 let dst = unsafe { dst.ptr.as_mut() };
181 let det = unsafe { std::slice::from_raw_parts(det.ptr.as_ptr(), det.len) };
182 let seg = unsafe { std::slice::from_raw_parts(seg.ptr.as_ptr(), seg.len) };
183 let res = gl_converter.draw_masks(dst, det, seg);
184 let _ = resp.send(res);
185 }
186 GLProcessorMessage::DrawMasksProto(mut dst, det, proto_data, resp) => {
187 let dst = unsafe { dst.ptr.as_mut() };
190 let det = unsafe { std::slice::from_raw_parts(det.ptr.as_ptr(), det.len) };
191 let res = gl_converter.draw_masks_proto(dst, det, &proto_data);
192 let _ = resp.send(res);
193 }
194 GLProcessorMessage::SetColors(colors, resp) => {
195 let res = gl_converter.set_class_colors(&colors);
196 let _ = resp.send(res);
197 }
198 GLProcessorMessage::SetInt8Interpolation(mode, resp) => {
199 gl_converter.set_int8_interpolation_mode(mode);
200 let _ = resp.send(Ok(()));
201 }
202 GLProcessorMessage::DecodeMasksAtlas(
203 det,
204 proto_data,
205 output_width,
206 output_height,
207 resp,
208 ) => {
209 let det = unsafe { std::slice::from_raw_parts(det.ptr.as_ptr(), det.len) };
210 let res = gl_converter.decode_masks_atlas(
211 det,
212 &proto_data,
213 output_width,
214 output_height,
215 );
216 let _ = resp.send(res);
217 }
218 GLProcessorMessage::PboCreate(size, resp) => {
219 let result = unsafe {
220 let mut id: u32 = 0;
221 gls::gl::GenBuffers(1, &mut id);
222 gls::gl::BindBuffer(gls::gl::PIXEL_PACK_BUFFER, id);
223 gls::gl::BufferData(
224 gls::gl::PIXEL_PACK_BUFFER,
225 size as isize,
226 std::ptr::null(),
227 gls::gl::STREAM_COPY,
228 );
229 gls::gl::BindBuffer(gls::gl::PIXEL_PACK_BUFFER, 0);
230 match check_gl_error("PboCreate", 0) {
231 Ok(()) => Ok(id),
232 Err(e) => {
233 gls::gl::DeleteBuffers(1, &id);
234 Err(e)
235 }
236 }
237 };
238 let _ = resp.send(result);
239 }
240 GLProcessorMessage::PboMap(buffer_id, size, resp) => {
241 let result = unsafe {
242 gls::gl::BindBuffer(gls::gl::PIXEL_PACK_BUFFER, buffer_id);
243 let ptr = gls::gl::MapBufferRange(
244 gls::gl::PIXEL_PACK_BUFFER,
245 0,
246 size as isize,
247 gls::gl::MAP_READ_BIT | gls::gl::MAP_WRITE_BIT,
248 );
249 gls::gl::BindBuffer(gls::gl::PIXEL_PACK_BUFFER, 0);
250 if ptr.is_null() {
251 Err(crate::Error::OpenGl(
252 "glMapBufferRange returned null".to_string(),
253 ))
254 } else {
255 Ok(edgefirst_tensor::PboMapping {
256 ptr: ptr as *mut u8,
257 size,
258 })
259 }
260 };
261 let _ = resp.send(result);
262 }
263 GLProcessorMessage::PboUnmap(buffer_id, resp) => {
264 let result = unsafe {
265 gls::gl::BindBuffer(gls::gl::PIXEL_PACK_BUFFER, buffer_id);
266 let ok = gls::gl::UnmapBuffer(gls::gl::PIXEL_PACK_BUFFER);
267 gls::gl::BindBuffer(gls::gl::PIXEL_PACK_BUFFER, 0);
268 if ok == gls::gl::FALSE {
269 Err(Error::OpenGl(
270 "PBO data was corrupted during mapping".into(),
271 ))
272 } else {
273 check_gl_error("PboUnmap", 0)
274 }
275 };
276 let _ = resp.send(result);
277 }
278 GLProcessorMessage::PboDelete(buffer_id) => unsafe {
279 gls::gl::DeleteBuffers(1, &buffer_id);
280 },
281 }
282 }
283 };
284
285 let handle = std::thread::spawn(func);
287
288 let transfer_backend = match create_ctx_recv.blocking_recv() {
289 Ok(Err(e)) => return Err(e),
290 Err(_) => {
291 return Err(Error::Internal(
292 "GL converter error messaging closed without update".to_string(),
293 ));
294 }
295 Ok(Ok(tb)) => tb,
296 };
297
298 Ok(Self {
299 handle: Some(handle),
300 sender: Some(send),
301 transfer_backend,
302 })
303 }
304}
305
306impl ImageProcessorTrait for GLProcessorThreaded {
307 fn convert(
308 &mut self,
309 src: &TensorDyn,
310 dst: &mut TensorDyn,
311 rotation: crate::Rotation,
312 flip: Flip,
313 crop: Crop,
314 ) -> crate::Result<()> {
315 let (err_send, err_recv) = tokio::sync::oneshot::channel();
316 self.sender
317 .as_ref()
318 .unwrap()
319 .blocking_send(GLProcessorMessage::ImageConvert(
320 SendablePtr {
321 ptr: NonNull::from(src),
322 len: 1,
323 },
324 SendablePtr {
325 ptr: NonNull::from(dst),
326 len: 1,
327 },
328 rotation,
329 flip,
330 crop,
331 err_send,
332 ))
333 .map_err(|_| Error::Internal("GL converter thread exited".to_string()))?;
334 err_recv.blocking_recv().map_err(|_| {
335 Error::Internal("GL converter error messaging closed without update".to_string())
336 })?
337 }
338
339 fn draw_masks(
340 &mut self,
341 dst: &mut TensorDyn,
342 detect: &[crate::DetectBox],
343 segmentation: &[crate::Segmentation],
344 ) -> crate::Result<()> {
345 let (err_send, err_recv) = tokio::sync::oneshot::channel();
346 self.sender
347 .as_ref()
348 .unwrap()
349 .blocking_send(GLProcessorMessage::DrawMasks(
350 SendablePtr {
351 ptr: NonNull::from(dst),
352 len: 1,
353 },
354 SendablePtr {
355 ptr: NonNull::new(detect.as_ptr() as *mut DetectBox).unwrap(),
356 len: detect.len(),
357 },
358 SendablePtr {
359 ptr: NonNull::new(segmentation.as_ptr() as *mut Segmentation).unwrap(),
360 len: segmentation.len(),
361 },
362 err_send,
363 ))
364 .map_err(|_| Error::Internal("GL converter thread exited".to_string()))?;
365 err_recv.blocking_recv().map_err(|_| {
366 Error::Internal("GL converter error messaging closed without update".to_string())
367 })?
368 }
369
370 fn draw_masks_proto(
371 &mut self,
372 dst: &mut TensorDyn,
373 detect: &[DetectBox],
374 proto_data: &ProtoData,
375 ) -> crate::Result<()> {
376 let (err_send, err_recv) = tokio::sync::oneshot::channel();
377 self.sender
378 .as_ref()
379 .unwrap()
380 .blocking_send(GLProcessorMessage::DrawMasksProto(
381 SendablePtr {
382 ptr: NonNull::from(dst),
383 len: 1,
384 },
385 SendablePtr {
386 ptr: NonNull::new(detect.as_ptr() as *mut DetectBox).unwrap(),
387 len: detect.len(),
388 },
389 Box::new(proto_data.clone()),
390 err_send,
391 ))
392 .map_err(|_| Error::Internal("GL converter thread exited".to_string()))?;
393 err_recv.blocking_recv().map_err(|_| {
394 Error::Internal("GL converter error messaging closed without update".to_string())
395 })?
396 }
397
398 fn decode_masks_atlas(
399 &mut self,
400 detect: &[DetectBox],
401 proto_data: ProtoData,
402 output_width: usize,
403 output_height: usize,
404 ) -> crate::Result<(Vec<u8>, Vec<MaskRegion>)> {
405 GLProcessorThreaded::decode_masks_atlas(
406 self,
407 detect,
408 proto_data,
409 output_width,
410 output_height,
411 )
412 }
413
414 fn set_class_colors(&mut self, colors: &[[u8; 4]]) -> Result<(), crate::Error> {
415 let (err_send, err_recv) = tokio::sync::oneshot::channel();
416 self.sender
417 .as_ref()
418 .unwrap()
419 .blocking_send(GLProcessorMessage::SetColors(colors.to_vec(), err_send))
420 .map_err(|_| Error::Internal("GL converter thread exited".to_string()))?;
421 err_recv.blocking_recv().map_err(|_| {
422 Error::Internal("GL converter error messaging closed without update".to_string())
423 })?
424 }
425}
426
427impl GLProcessorThreaded {
428 pub fn set_int8_interpolation_mode(
430 &mut self,
431 mode: Int8InterpolationMode,
432 ) -> Result<(), crate::Error> {
433 let (err_send, err_recv) = tokio::sync::oneshot::channel();
434 self.sender
435 .as_ref()
436 .unwrap()
437 .blocking_send(GLProcessorMessage::SetInt8Interpolation(mode, err_send))
438 .map_err(|_| Error::Internal("GL converter thread exited".to_string()))?;
439 err_recv.blocking_recv().map_err(|_| {
440 Error::Internal("GL converter error messaging closed without update".to_string())
441 })?
442 }
443
444 pub fn decode_masks_atlas(
450 &mut self,
451 detect: &[DetectBox],
452 proto_data: ProtoData,
453 output_width: usize,
454 output_height: usize,
455 ) -> Result<(Vec<u8>, Vec<MaskRegion>), crate::Error> {
456 let (resp_send, resp_recv) = tokio::sync::oneshot::channel();
457 self.sender
458 .as_ref()
459 .unwrap()
460 .blocking_send(GLProcessorMessage::DecodeMasksAtlas(
461 SendablePtr {
462 ptr: NonNull::new(detect.as_ptr() as *mut DetectBox).unwrap(),
463 len: detect.len(),
464 },
465 Box::new(proto_data),
466 output_width,
467 output_height,
468 resp_send,
469 ))
470 .map_err(|_| Error::Internal("GL converter thread exited".to_string()))?;
471 resp_recv.blocking_recv().map_err(|_| {
472 Error::Internal("GL converter error messaging closed without update".to_string())
473 })?
474 }
475
476 pub fn create_pbo_image(
478 &self,
479 width: usize,
480 height: usize,
481 format: edgefirst_tensor::PixelFormat,
482 ) -> Result<edgefirst_tensor::Tensor<u8>, Error> {
483 let sender = self
484 .sender
485 .as_ref()
486 .ok_or(Error::OpenGl("GL processor is shutting down".to_string()))?;
487
488 let channels = format.channels();
489 let size = match format.layout() {
490 edgefirst_tensor::PixelLayout::SemiPlanar => {
491 match format {
493 edgefirst_tensor::PixelFormat::Nv12 => width * height * 3 / 2,
494 edgefirst_tensor::PixelFormat::Nv16 => width * height * 2,
495 _ => width * height * channels,
496 }
497 }
498 edgefirst_tensor::PixelLayout::Packed | edgefirst_tensor::PixelLayout::Planar => {
499 width * height * channels
500 }
501 _ => width * height * channels,
502 };
503 if size == 0 {
504 return Err(Error::OpenGl("Invalid image dimensions".to_string()));
505 }
506
507 let (tx, rx) = tokio::sync::oneshot::channel();
509 sender
510 .blocking_send(GLProcessorMessage::PboCreate(size, tx))
511 .map_err(|_| Error::OpenGl("GL thread channel closed".to_string()))?;
512 let buffer_id = rx
513 .blocking_recv()
514 .map_err(|_| Error::OpenGl("GL thread did not respond".to_string()))??;
515
516 let ops: std::sync::Arc<dyn edgefirst_tensor::PboOps> = std::sync::Arc::new(GlPboOps {
517 sender: sender.downgrade(),
518 });
519
520 let shape = match format.layout() {
521 edgefirst_tensor::PixelLayout::Planar => vec![channels, height, width],
522 edgefirst_tensor::PixelLayout::SemiPlanar => {
523 let total_h = match format {
524 edgefirst_tensor::PixelFormat::Nv12 => height * 3 / 2,
525 edgefirst_tensor::PixelFormat::Nv16 => height * 2,
526 _ => height * 2,
527 };
528 vec![total_h, width]
529 }
530 _ => vec![height, width, channels],
531 };
532
533 let pbo_tensor =
534 edgefirst_tensor::PboTensor::<u8>::from_pbo(buffer_id, size, &shape, None, ops)
535 .map_err(|e| Error::OpenGl(format!("PBO tensor creation failed: {e:?}")))?;
536 let mut tensor = edgefirst_tensor::Tensor::from_pbo(pbo_tensor);
537 tensor
538 .set_format(format)
539 .map_err(|e| Error::OpenGl(format!("Failed to set format on PBO tensor: {e:?}")))?;
540 Ok(tensor)
541 }
542
543 pub(crate) fn transfer_backend(&self) -> TransferBackend {
545 self.transfer_backend
546 }
547}
548
549impl Drop for GLProcessorThreaded {
550 fn drop(&mut self) {
551 drop(self.sender.take());
552 let _ = self.handle.take().and_then(|h| h.join().ok());
553 }
554}