1use edgefirst_decoder::{DetectBox, ProtoData, Segmentation};
5use std::panic::AssertUnwindSafe;
6use std::ptr::NonNull;
7use std::thread::JoinHandle;
8use tokio::sync::mpsc::{Sender, WeakSender};
9
10use super::processor::GLProcessorST;
11use super::shaders::check_gl_error;
12use super::{EglDisplayKind, Int8InterpolationMode, TransferBackend};
13use crate::{Crop, Error, Flip, ImageProcessorTrait, Rotation};
14use edgefirst_tensor::TensorDyn;
15
16#[allow(clippy::type_complexity)]
17enum GLProcessorMessage {
18 ImageConvert(
19 SendablePtr<TensorDyn>,
20 SendablePtr<TensorDyn>,
21 Rotation,
22 Flip,
23 Crop,
24 tokio::sync::oneshot::Sender<Result<(), Error>>,
25 ),
26 SetColors(
27 Vec<[u8; 4]>,
28 tokio::sync::oneshot::Sender<Result<(), Error>>,
29 ),
30 DrawDecodedMasks(
31 SendablePtr<TensorDyn>,
32 SendablePtr<DetectBox>,
33 SendablePtr<Segmentation>,
34 f32, Option<SendablePtr<TensorDyn>>, Option<[f32; 4]>, crate::ColorMode,
38 tokio::sync::oneshot::Sender<Result<(), Error>>,
39 ),
40 DrawProtoMasks(
41 SendablePtr<TensorDyn>,
42 SendablePtr<DetectBox>,
43 SendablePtr<ProtoData>,
44 f32, Option<SendablePtr<TensorDyn>>, Option<[f32; 4]>, crate::ColorMode,
48 tokio::sync::oneshot::Sender<Result<(), Error>>,
49 ),
50 SetInt8Interpolation(
51 Int8InterpolationMode,
52 tokio::sync::oneshot::Sender<Result<(), Error>>,
53 ),
54 PboCreate(
55 usize, tokio::sync::oneshot::Sender<Result<u32, Error>>,
57 ),
58 PboMap(
59 u32, usize, tokio::sync::oneshot::Sender<Result<edgefirst_tensor::PboMapping, Error>>,
62 ),
63 PboUnmap(
64 u32, tokio::sync::oneshot::Sender<Result<(), Error>>,
66 ),
67 PboDelete(u32), }
69
70struct GlPboOps {
77 sender: WeakSender<GLProcessorMessage>,
78}
79
80unsafe impl edgefirst_tensor::PboOps for GlPboOps {
85 fn map_buffer(
86 &self,
87 buffer_id: u32,
88 size: usize,
89 ) -> edgefirst_tensor::Result<edgefirst_tensor::PboMapping> {
90 let sender = self
91 .sender
92 .upgrade()
93 .ok_or(edgefirst_tensor::Error::PboDisconnected)?;
94 let (tx, rx) = tokio::sync::oneshot::channel();
95 sender
96 .blocking_send(GLProcessorMessage::PboMap(buffer_id, size, tx))
97 .map_err(|_| edgefirst_tensor::Error::PboDisconnected)?;
98 rx.blocking_recv()
99 .map_err(|_| edgefirst_tensor::Error::PboDisconnected)?
100 .map_err(|e| {
101 edgefirst_tensor::Error::NotImplemented(format!("GL PBO map failed: {e:?}"))
102 })
103 }
104
105 fn unmap_buffer(&self, buffer_id: u32) -> edgefirst_tensor::Result<()> {
106 let sender = self
107 .sender
108 .upgrade()
109 .ok_or(edgefirst_tensor::Error::PboDisconnected)?;
110 let (tx, rx) = tokio::sync::oneshot::channel();
111 sender
112 .blocking_send(GLProcessorMessage::PboUnmap(buffer_id, tx))
113 .map_err(|_| edgefirst_tensor::Error::PboDisconnected)?;
114 rx.blocking_recv()
115 .map_err(|_| edgefirst_tensor::Error::PboDisconnected)?
116 .map_err(|e| {
117 edgefirst_tensor::Error::NotImplemented(format!("GL PBO unmap failed: {e:?}"))
118 })
119 }
120
121 fn delete_buffer(&self, buffer_id: u32) {
122 if let Some(sender) = self.sender.upgrade() {
123 let _ = sender.blocking_send(GLProcessorMessage::PboDelete(buffer_id));
124 }
125 }
126}
127
128#[derive(Debug)]
133pub struct GLProcessorThreaded {
134 handle: Option<JoinHandle<()>>,
136
137 sender: Option<Sender<GLProcessorMessage>>,
139 transfer_backend: TransferBackend,
140}
141
142unsafe impl Send for GLProcessorThreaded {}
143unsafe impl Sync for GLProcessorThreaded {}
144
145struct SendablePtr<T: Send> {
146 ptr: NonNull<T>,
147 len: usize,
148}
149
150unsafe impl<T> Send for SendablePtr<T> where T: Send {}
151
152fn panic_message(info: &(dyn std::any::Any + Send)) -> String {
154 if let Some(s) = info.downcast_ref::<&str>() {
155 s.to_string()
156 } else if let Some(s) = info.downcast_ref::<String>() {
157 s.clone()
158 } else {
159 "unknown panic".to_string()
160 }
161}
162
163impl GLProcessorThreaded {
164 pub fn new(kind: Option<EglDisplayKind>) -> Result<Self, Error> {
166 let (send, mut recv) = tokio::sync::mpsc::channel::<GLProcessorMessage>(1);
167
168 let (create_ctx_send, create_ctx_recv) = tokio::sync::oneshot::channel();
169
170 let func = move || {
171 let init_result = {
172 let _guard = super::context::GL_MUTEX
173 .lock()
174 .unwrap_or_else(|e| e.into_inner());
175 GLProcessorST::new(kind)
176 };
177 let mut gl_converter = match init_result {
178 Ok(gl) => gl,
179 Err(e) => {
180 let _ = create_ctx_send.send(Err(e));
181 return;
182 }
183 };
184 let _ = create_ctx_send.send(Ok(gl_converter.gl_context.transfer_backend));
185 let mut poisoned = false;
186 while let Some(msg) = recv.blocking_recv() {
187 let _guard = super::context::GL_MUTEX
190 .lock()
191 .unwrap_or_else(|e| e.into_inner());
192
193 if poisoned {
198 let poison_err = crate::Error::Internal(
199 "GL context is poisoned after a prior panic".to_string(),
200 );
201 match msg {
202 GLProcessorMessage::ImageConvert(.., resp) => {
203 let _ = resp.send(Err(poison_err));
204 }
205 GLProcessorMessage::DrawDecodedMasks(.., resp) => {
206 let _ = resp.send(Err(poison_err));
207 }
208 GLProcessorMessage::DrawProtoMasks(.., resp) => {
209 let _ = resp.send(Err(poison_err));
210 }
211 GLProcessorMessage::SetColors(_, resp) => {
212 let _ = resp.send(Err(poison_err));
213 }
214 GLProcessorMessage::SetInt8Interpolation(_, resp) => {
215 let _ = resp.send(Err(poison_err));
216 }
217 GLProcessorMessage::PboCreate(_, resp) => {
218 let _ = resp.send(Err(poison_err));
219 }
220 GLProcessorMessage::PboMap(_, _, resp) => {
221 let _ = resp.send(Err(poison_err));
222 }
223 GLProcessorMessage::PboUnmap(_, resp) => {
224 let _ = resp.send(Err(poison_err));
225 }
226 GLProcessorMessage::PboDelete(_) => {}
227 }
228 continue;
229 }
230
231 match msg {
232 GLProcessorMessage::ImageConvert(src, mut dst, rotation, flip, crop, resp) => {
233 let result = std::panic::catch_unwind(AssertUnwindSafe(|| {
236 let src = unsafe { src.ptr.as_ref() };
237 let dst = unsafe { dst.ptr.as_mut() };
238 gl_converter.convert(src, dst, rotation, flip, crop)
239 }));
240 let _ = resp.send(match result {
241 Ok(res) => res,
242 Err(e) => {
243 poisoned = true;
244 Err(crate::Error::Internal(format!(
245 "GL thread panicked during ImageConvert: {}",
246 panic_message(e.as_ref()),
247 )))
248 }
249 });
250 }
251 GLProcessorMessage::DrawDecodedMasks(
252 mut dst,
253 det,
254 seg,
255 opacity,
256 bg,
257 letterbox,
258 color_mode,
259 resp,
260 ) => {
261 let result = std::panic::catch_unwind(AssertUnwindSafe(|| {
265 let dst = unsafe { dst.ptr.as_mut() };
266 let det =
267 unsafe { std::slice::from_raw_parts(det.ptr.as_ptr(), det.len) };
268 let seg =
269 unsafe { std::slice::from_raw_parts(seg.ptr.as_ptr(), seg.len) };
270 let bg_ref = bg.map(|p| unsafe { &*p.ptr.as_ptr() });
271 gl_converter.draw_decoded_masks(
272 dst,
273 det,
274 seg,
275 crate::MaskOverlay {
276 background: bg_ref,
277 opacity,
278 letterbox,
279 color_mode,
280 },
281 )
282 }));
283 let _ = resp.send(match result {
284 Ok(res) => res,
285 Err(e) => {
286 poisoned = true;
287 Err(crate::Error::Internal(format!(
288 "GL thread panicked during DrawDecodedMasks: {}",
289 panic_message(e.as_ref()),
290 )))
291 }
292 });
293 }
294 GLProcessorMessage::DrawProtoMasks(
295 mut dst,
296 det,
297 proto_data,
298 opacity,
299 bg,
300 letterbox,
301 color_mode,
302 resp,
303 ) => {
304 let result = std::panic::catch_unwind(AssertUnwindSafe(|| {
307 let dst = unsafe { dst.ptr.as_mut() };
308 let det =
309 unsafe { std::slice::from_raw_parts(det.ptr.as_ptr(), det.len) };
310 let bg_ref = bg.map(|p| unsafe { &*p.ptr.as_ptr() });
311 let proto_data = unsafe { proto_data.ptr.as_ref() };
312 gl_converter.draw_proto_masks(
313 dst,
314 det,
315 proto_data,
316 crate::MaskOverlay {
317 background: bg_ref,
318 opacity,
319 letterbox,
320 color_mode,
321 },
322 )
323 }));
324 let _ = resp.send(match result {
325 Ok(res) => res,
326 Err(e) => {
327 poisoned = true;
328 Err(crate::Error::Internal(format!(
329 "GL thread panicked during DrawProtoMasks: {}",
330 panic_message(e.as_ref()),
331 )))
332 }
333 });
334 }
335 GLProcessorMessage::SetColors(colors, resp) => {
336 let result = std::panic::catch_unwind(AssertUnwindSafe(|| {
337 gl_converter.set_class_colors(&colors)
338 }));
339 let _ = resp.send(match result {
340 Ok(res) => res,
341 Err(e) => {
342 poisoned = true;
343 Err(crate::Error::Internal(format!(
344 "GL thread panicked during SetColors: {}",
345 panic_message(e.as_ref()),
346 )))
347 }
348 });
349 }
350 GLProcessorMessage::SetInt8Interpolation(mode, resp) => {
351 let result = std::panic::catch_unwind(AssertUnwindSafe(|| {
352 gl_converter.set_int8_interpolation_mode(mode);
353 Ok(())
354 }));
355 let _ = resp.send(match result {
356 Ok(res) => res,
357 Err(e) => {
358 poisoned = true;
359 Err(crate::Error::Internal(format!(
360 "GL thread panicked during SetInt8Interpolation: {}",
361 panic_message(e.as_ref()),
362 )))
363 }
364 });
365 }
366 GLProcessorMessage::PboCreate(size, resp) => {
367 let result = std::panic::catch_unwind(AssertUnwindSafe(|| unsafe {
368 let mut id: u32 = 0;
369 gls::gl::GenBuffers(1, &mut id);
370 gls::gl::BindBuffer(gls::gl::PIXEL_PACK_BUFFER, id);
371 gls::gl::BufferData(
372 gls::gl::PIXEL_PACK_BUFFER,
373 size as isize,
374 std::ptr::null(),
375 gls::gl::STREAM_COPY,
376 );
377 gls::gl::BindBuffer(gls::gl::PIXEL_PACK_BUFFER, 0);
378 match check_gl_error("PboCreate", 0) {
379 Ok(()) => Ok(id),
380 Err(e) => {
381 gls::gl::DeleteBuffers(1, &id);
382 Err(e)
383 }
384 }
385 }));
386 let _ = resp.send(match result {
387 Ok(res) => res,
388 Err(e) => {
389 poisoned = true;
390 Err(crate::Error::Internal(format!(
391 "GL thread panicked during PboCreate: {}",
392 panic_message(e.as_ref()),
393 )))
394 }
395 });
396 }
397 GLProcessorMessage::PboMap(buffer_id, size, resp) => {
398 let result = std::panic::catch_unwind(AssertUnwindSafe(|| unsafe {
399 gls::gl::BindBuffer(gls::gl::PIXEL_PACK_BUFFER, buffer_id);
400 let ptr = gls::gl::MapBufferRange(
401 gls::gl::PIXEL_PACK_BUFFER,
402 0,
403 size as isize,
404 gls::gl::MAP_READ_BIT | gls::gl::MAP_WRITE_BIT,
405 );
406 gls::gl::BindBuffer(gls::gl::PIXEL_PACK_BUFFER, 0);
407 if ptr.is_null() {
408 Err(crate::Error::OpenGl(
409 "glMapBufferRange returned null".to_string(),
410 ))
411 } else {
412 Ok(edgefirst_tensor::PboMapping {
413 ptr: ptr as *mut u8,
414 size,
415 })
416 }
417 }));
418 let _ = resp.send(match result {
419 Ok(res) => res,
420 Err(e) => {
421 poisoned = true;
422 Err(crate::Error::Internal(format!(
423 "GL thread panicked during PboMap: {}",
424 panic_message(e.as_ref()),
425 )))
426 }
427 });
428 }
429 GLProcessorMessage::PboUnmap(buffer_id, resp) => {
430 let result = std::panic::catch_unwind(AssertUnwindSafe(|| unsafe {
431 gls::gl::BindBuffer(gls::gl::PIXEL_PACK_BUFFER, buffer_id);
432 let ok = gls::gl::UnmapBuffer(gls::gl::PIXEL_PACK_BUFFER);
433 gls::gl::BindBuffer(gls::gl::PIXEL_PACK_BUFFER, 0);
434 if ok == gls::gl::FALSE {
435 Err(Error::OpenGl(
436 "PBO data was corrupted during mapping".into(),
437 ))
438 } else {
439 check_gl_error("PboUnmap", 0)
440 }
441 }));
442 let _ = resp.send(match result {
443 Ok(res) => res,
444 Err(e) => {
445 poisoned = true;
446 Err(crate::Error::Internal(format!(
447 "GL thread panicked during PboUnmap: {}",
448 panic_message(e.as_ref()),
449 )))
450 }
451 });
452 }
453 GLProcessorMessage::PboDelete(buffer_id) => {
454 if let Err(e) = std::panic::catch_unwind(AssertUnwindSafe(|| unsafe {
455 gls::gl::DeleteBuffers(1, &buffer_id);
456 })) {
457 poisoned = true;
458 log::error!(
459 "GL thread panicked during PboDelete: {}",
460 panic_message(e.as_ref()),
461 );
462 }
463 }
464 }
465 }
466 let _guard = super::context::GL_MUTEX
468 .lock()
469 .unwrap_or_else(|e| e.into_inner());
470 drop(gl_converter);
471 };
472
473 let handle = std::thread::spawn(func);
475
476 let transfer_backend = match create_ctx_recv.blocking_recv() {
477 Ok(Err(e)) => return Err(e),
478 Err(_) => {
479 return Err(Error::Internal(
480 "GL converter error messaging closed without update".to_string(),
481 ));
482 }
483 Ok(Ok(tb)) => tb,
484 };
485
486 Ok(Self {
487 handle: Some(handle),
488 sender: Some(send),
489 transfer_backend,
490 })
491 }
492}
493
494impl ImageProcessorTrait for GLProcessorThreaded {
495 fn convert(
496 &mut self,
497 src: &TensorDyn,
498 dst: &mut TensorDyn,
499 rotation: crate::Rotation,
500 flip: Flip,
501 crop: Crop,
502 ) -> crate::Result<()> {
503 let (err_send, err_recv) = tokio::sync::oneshot::channel();
504 self.sender
505 .as_ref()
506 .ok_or_else(|| Error::Internal("GL processor is shutting down".to_string()))?
507 .blocking_send(GLProcessorMessage::ImageConvert(
508 SendablePtr {
509 ptr: NonNull::from(src),
510 len: 1,
511 },
512 SendablePtr {
513 ptr: NonNull::from(dst),
514 len: 1,
515 },
516 rotation,
517 flip,
518 crop,
519 err_send,
520 ))
521 .map_err(|_| Error::Internal("GL converter thread exited".to_string()))?;
522 err_recv.blocking_recv().map_err(|_| {
523 Error::Internal("GL converter error messaging closed without update".to_string())
524 })?
525 }
526
527 fn draw_decoded_masks(
528 &mut self,
529 dst: &mut TensorDyn,
530 detect: &[crate::DetectBox],
531 segmentation: &[crate::Segmentation],
532 overlay: crate::MaskOverlay<'_>,
533 ) -> crate::Result<()> {
534 let (err_send, err_recv) = tokio::sync::oneshot::channel();
535 self.sender
536 .as_ref()
537 .ok_or_else(|| Error::Internal("GL processor is shutting down".to_string()))?
538 .blocking_send(GLProcessorMessage::DrawDecodedMasks(
539 SendablePtr {
540 ptr: NonNull::from(dst),
541 len: 1,
542 },
543 SendablePtr {
544 ptr: NonNull::new(detect.as_ptr() as *mut DetectBox).unwrap(),
545 len: detect.len(),
546 },
547 SendablePtr {
548 ptr: NonNull::new(segmentation.as_ptr() as *mut Segmentation).unwrap(),
549 len: segmentation.len(),
550 },
551 overlay.opacity,
552 overlay.background.map(|bg| SendablePtr {
553 ptr: NonNull::from(bg).cast::<TensorDyn>(),
554 len: 1,
555 }),
556 overlay.letterbox,
557 overlay.color_mode,
558 err_send,
559 ))
560 .map_err(|_| Error::Internal("GL converter thread exited".to_string()))?;
561 err_recv.blocking_recv().map_err(|_| {
562 Error::Internal("GL converter error messaging closed without update".to_string())
563 })?
564 }
565
566 fn draw_proto_masks(
567 &mut self,
568 dst: &mut TensorDyn,
569 detect: &[DetectBox],
570 proto_data: &ProtoData,
571 overlay: crate::MaskOverlay<'_>,
572 ) -> crate::Result<()> {
573 let (err_send, err_recv) = tokio::sync::oneshot::channel();
574 self.sender
575 .as_ref()
576 .ok_or_else(|| Error::Internal("GL processor is shutting down".to_string()))?
577 .blocking_send(GLProcessorMessage::DrawProtoMasks(
578 SendablePtr {
579 ptr: NonNull::from(dst),
580 len: 1,
581 },
582 SendablePtr {
583 ptr: NonNull::new(detect.as_ptr() as *mut DetectBox).unwrap(),
584 len: detect.len(),
585 },
586 SendablePtr {
587 ptr: NonNull::from(proto_data).cast::<ProtoData>(),
588 len: 1,
589 },
590 overlay.opacity,
591 overlay.background.map(|bg| SendablePtr {
592 ptr: NonNull::from(bg).cast::<TensorDyn>(),
593 len: 1,
594 }),
595 overlay.letterbox,
596 overlay.color_mode,
597 err_send,
598 ))
599 .map_err(|_| Error::Internal("GL converter thread exited".to_string()))?;
600 err_recv.blocking_recv().map_err(|_| {
601 Error::Internal("GL converter error messaging closed without update".to_string())
602 })?
603 }
604
605 fn set_class_colors(&mut self, colors: &[[u8; 4]]) -> Result<(), crate::Error> {
606 let (err_send, err_recv) = tokio::sync::oneshot::channel();
607 self.sender
608 .as_ref()
609 .ok_or_else(|| Error::Internal("GL processor is shutting down".to_string()))?
610 .blocking_send(GLProcessorMessage::SetColors(colors.to_vec(), err_send))
611 .map_err(|_| Error::Internal("GL converter thread exited".to_string()))?;
612 err_recv.blocking_recv().map_err(|_| {
613 Error::Internal("GL converter error messaging closed without update".to_string())
614 })?
615 }
616}
617
618impl GLProcessorThreaded {
619 pub fn set_int8_interpolation_mode(
621 &mut self,
622 mode: Int8InterpolationMode,
623 ) -> Result<(), crate::Error> {
624 let (err_send, err_recv) = tokio::sync::oneshot::channel();
625 self.sender
626 .as_ref()
627 .ok_or_else(|| Error::Internal("GL processor is shutting down".to_string()))?
628 .blocking_send(GLProcessorMessage::SetInt8Interpolation(mode, err_send))
629 .map_err(|_| Error::Internal("GL converter thread exited".to_string()))?;
630 err_recv.blocking_recv().map_err(|_| {
631 Error::Internal("GL converter error messaging closed without update".to_string())
632 })?
633 }
634
635 pub fn create_pbo_image(
637 &self,
638 width: usize,
639 height: usize,
640 format: edgefirst_tensor::PixelFormat,
641 ) -> Result<edgefirst_tensor::Tensor<u8>, Error> {
642 let sender = self
643 .sender
644 .as_ref()
645 .ok_or(Error::OpenGl("GL processor is shutting down".to_string()))?;
646
647 let channels = format.channels();
648 let size = match format.layout() {
649 edgefirst_tensor::PixelLayout::SemiPlanar => {
650 match format {
652 edgefirst_tensor::PixelFormat::Nv12 => width * height * 3 / 2,
653 edgefirst_tensor::PixelFormat::Nv16 => width * height * 2,
654 _ => width * height * channels,
655 }
656 }
657 edgefirst_tensor::PixelLayout::Packed | edgefirst_tensor::PixelLayout::Planar => {
658 width * height * channels
659 }
660 _ => width * height * channels,
661 };
662 if size == 0 {
663 return Err(Error::OpenGl("Invalid image dimensions".to_string()));
664 }
665
666 let (tx, rx) = tokio::sync::oneshot::channel();
668 sender
669 .blocking_send(GLProcessorMessage::PboCreate(size, tx))
670 .map_err(|_| Error::OpenGl("GL thread channel closed".to_string()))?;
671 let buffer_id = rx
672 .blocking_recv()
673 .map_err(|_| Error::OpenGl("GL thread did not respond".to_string()))??;
674
675 let ops: std::sync::Arc<dyn edgefirst_tensor::PboOps> = std::sync::Arc::new(GlPboOps {
676 sender: sender.downgrade(),
677 });
678
679 let shape = match format.layout() {
680 edgefirst_tensor::PixelLayout::Planar => vec![channels, height, width],
681 edgefirst_tensor::PixelLayout::SemiPlanar => {
682 let total_h = match format {
683 edgefirst_tensor::PixelFormat::Nv12 => height * 3 / 2,
684 edgefirst_tensor::PixelFormat::Nv16 => height * 2,
685 _ => height * 2,
686 };
687 vec![total_h, width]
688 }
689 _ => vec![height, width, channels],
690 };
691
692 let pbo_tensor =
693 edgefirst_tensor::PboTensor::<u8>::from_pbo(buffer_id, size, &shape, None, ops)
694 .map_err(|e| Error::OpenGl(format!("PBO tensor creation failed: {e:?}")))?;
695 let mut tensor = edgefirst_tensor::Tensor::from_pbo(pbo_tensor);
696 tensor
697 .set_format(format)
698 .map_err(|e| Error::OpenGl(format!("Failed to set format on PBO tensor: {e:?}")))?;
699 Ok(tensor)
700 }
701
702 pub(crate) fn transfer_backend(&self) -> TransferBackend {
704 self.transfer_backend
705 }
706}
707
708impl Drop for GLProcessorThreaded {
709 fn drop(&mut self) {
710 drop(self.sender.take());
711 let _ = self.handle.take().and_then(|h| h.join().ok());
712 }
713}