1use edgefirst_decoder::{DetectBox, ProtoData, Segmentation};
5use std::panic::AssertUnwindSafe;
6use std::ptr::NonNull;
7use std::thread::JoinHandle;
8use tokio::sync::mpsc::{Sender, WeakSender};
9
10use super::processor::GLProcessorST;
11use super::shaders::check_gl_error;
12use super::{EglDisplayKind, Int8InterpolationMode, TransferBackend};
13use crate::{Crop, Error, Flip, ImageProcessorTrait, MaskRegion, Rotation};
14use edgefirst_tensor::TensorDyn;
15
16#[allow(clippy::type_complexity)]
17enum GLProcessorMessage {
18 ImageConvert(
19 SendablePtr<TensorDyn>,
20 SendablePtr<TensorDyn>,
21 Rotation,
22 Flip,
23 Crop,
24 tokio::sync::oneshot::Sender<Result<(), Error>>,
25 ),
26 SetColors(
27 Vec<[u8; 4]>,
28 tokio::sync::oneshot::Sender<Result<(), Error>>,
29 ),
30 DrawMasks(
31 SendablePtr<TensorDyn>,
32 SendablePtr<DetectBox>,
33 SendablePtr<Segmentation>,
34 tokio::sync::oneshot::Sender<Result<(), Error>>,
35 ),
36 DrawMasksProto(
37 SendablePtr<TensorDyn>,
38 SendablePtr<DetectBox>,
39 Box<ProtoData>,
40 tokio::sync::oneshot::Sender<Result<(), Error>>,
41 ),
42 SetInt8Interpolation(
43 Int8InterpolationMode,
44 tokio::sync::oneshot::Sender<Result<(), Error>>,
45 ),
46 DecodeMasksAtlas(
47 SendablePtr<DetectBox>,
48 Box<ProtoData>,
49 usize, usize, tokio::sync::oneshot::Sender<Result<(Vec<u8>, Vec<MaskRegion>), Error>>,
52 ),
53 PboCreate(
54 usize, tokio::sync::oneshot::Sender<Result<u32, Error>>,
56 ),
57 PboMap(
58 u32, usize, tokio::sync::oneshot::Sender<Result<edgefirst_tensor::PboMapping, Error>>,
61 ),
62 PboUnmap(
63 u32, tokio::sync::oneshot::Sender<Result<(), Error>>,
65 ),
66 PboDelete(u32), }
68
69struct GlPboOps {
76 sender: WeakSender<GLProcessorMessage>,
77}
78
79unsafe impl edgefirst_tensor::PboOps for GlPboOps {
84 fn map_buffer(
85 &self,
86 buffer_id: u32,
87 size: usize,
88 ) -> edgefirst_tensor::Result<edgefirst_tensor::PboMapping> {
89 let sender = self
90 .sender
91 .upgrade()
92 .ok_or(edgefirst_tensor::Error::PboDisconnected)?;
93 let (tx, rx) = tokio::sync::oneshot::channel();
94 sender
95 .blocking_send(GLProcessorMessage::PboMap(buffer_id, size, tx))
96 .map_err(|_| edgefirst_tensor::Error::PboDisconnected)?;
97 rx.blocking_recv()
98 .map_err(|_| edgefirst_tensor::Error::PboDisconnected)?
99 .map_err(|e| {
100 edgefirst_tensor::Error::NotImplemented(format!("GL PBO map failed: {e:?}"))
101 })
102 }
103
104 fn unmap_buffer(&self, buffer_id: u32) -> edgefirst_tensor::Result<()> {
105 let sender = self
106 .sender
107 .upgrade()
108 .ok_or(edgefirst_tensor::Error::PboDisconnected)?;
109 let (tx, rx) = tokio::sync::oneshot::channel();
110 sender
111 .blocking_send(GLProcessorMessage::PboUnmap(buffer_id, tx))
112 .map_err(|_| edgefirst_tensor::Error::PboDisconnected)?;
113 rx.blocking_recv()
114 .map_err(|_| edgefirst_tensor::Error::PboDisconnected)?
115 .map_err(|e| {
116 edgefirst_tensor::Error::NotImplemented(format!("GL PBO unmap failed: {e:?}"))
117 })
118 }
119
120 fn delete_buffer(&self, buffer_id: u32) {
121 if let Some(sender) = self.sender.upgrade() {
122 let _ = sender.blocking_send(GLProcessorMessage::PboDelete(buffer_id));
123 }
124 }
125}
126
127#[derive(Debug)]
132pub struct GLProcessorThreaded {
133 handle: Option<JoinHandle<()>>,
135
136 sender: Option<Sender<GLProcessorMessage>>,
138 transfer_backend: TransferBackend,
139}
140
141unsafe impl Send for GLProcessorThreaded {}
142unsafe impl Sync for GLProcessorThreaded {}
143
144struct SendablePtr<T: Send> {
145 ptr: NonNull<T>,
146 len: usize,
147}
148
149unsafe impl<T> Send for SendablePtr<T> where T: Send {}
150
151fn panic_message(info: &(dyn std::any::Any + Send)) -> String {
153 if let Some(s) = info.downcast_ref::<&str>() {
154 s.to_string()
155 } else if let Some(s) = info.downcast_ref::<String>() {
156 s.clone()
157 } else {
158 "unknown panic".to_string()
159 }
160}
161
162impl GLProcessorThreaded {
163 pub fn new(kind: Option<EglDisplayKind>) -> Result<Self, Error> {
165 let (send, mut recv) = tokio::sync::mpsc::channel::<GLProcessorMessage>(1);
166
167 let (create_ctx_send, create_ctx_recv) = tokio::sync::oneshot::channel();
168
169 let func = move || {
170 let mut gl_converter = match GLProcessorST::new(kind) {
171 Ok(gl) => gl,
172 Err(e) => {
173 let _ = create_ctx_send.send(Err(e));
174 return;
175 }
176 };
177 let _ = create_ctx_send.send(Ok(gl_converter.gl_context.transfer_backend));
178 let mut poisoned = false;
179 while let Some(msg) = recv.blocking_recv() {
180 if poisoned {
185 let poison_err = crate::Error::Internal(
186 "GL context is poisoned after a prior panic".to_string(),
187 );
188 match msg {
189 GLProcessorMessage::ImageConvert(.., resp) => {
190 let _ = resp.send(Err(poison_err));
191 }
192 GLProcessorMessage::DrawMasks(.., resp) => {
193 let _ = resp.send(Err(poison_err));
194 }
195 GLProcessorMessage::DrawMasksProto(.., resp) => {
196 let _ = resp.send(Err(poison_err));
197 }
198 GLProcessorMessage::SetColors(_, resp) => {
199 let _ = resp.send(Err(poison_err));
200 }
201 GLProcessorMessage::SetInt8Interpolation(_, resp) => {
202 let _ = resp.send(Err(poison_err));
203 }
204 GLProcessorMessage::DecodeMasksAtlas(.., resp) => {
205 let _ = resp.send(Err(poison_err));
206 }
207 GLProcessorMessage::PboCreate(_, resp) => {
208 let _ = resp.send(Err(poison_err));
209 }
210 GLProcessorMessage::PboMap(_, _, resp) => {
211 let _ = resp.send(Err(poison_err));
212 }
213 GLProcessorMessage::PboUnmap(_, resp) => {
214 let _ = resp.send(Err(poison_err));
215 }
216 GLProcessorMessage::PboDelete(_) => {}
217 }
218 continue;
219 }
220
221 match msg {
222 GLProcessorMessage::ImageConvert(src, mut dst, rotation, flip, crop, resp) => {
223 let result = std::panic::catch_unwind(AssertUnwindSafe(|| {
226 let src = unsafe { src.ptr.as_ref() };
227 let dst = unsafe { dst.ptr.as_mut() };
228 gl_converter.convert(src, dst, rotation, flip, crop)
229 }));
230 let _ = resp.send(match result {
231 Ok(res) => res,
232 Err(e) => {
233 poisoned = true;
234 Err(crate::Error::Internal(format!(
235 "GL thread panicked during ImageConvert: {}",
236 panic_message(e.as_ref()),
237 )))
238 }
239 });
240 }
241 GLProcessorMessage::DrawMasks(mut dst, det, seg, resp) => {
242 let result = std::panic::catch_unwind(AssertUnwindSafe(|| {
246 let dst = unsafe { dst.ptr.as_mut() };
247 let det =
248 unsafe { std::slice::from_raw_parts(det.ptr.as_ptr(), det.len) };
249 let seg =
250 unsafe { std::slice::from_raw_parts(seg.ptr.as_ptr(), seg.len) };
251 gl_converter.draw_masks(dst, det, seg)
252 }));
253 let _ = resp.send(match result {
254 Ok(res) => res,
255 Err(e) => {
256 poisoned = true;
257 Err(crate::Error::Internal(format!(
258 "GL thread panicked during DrawMasks: {}",
259 panic_message(e.as_ref()),
260 )))
261 }
262 });
263 }
264 GLProcessorMessage::DrawMasksProto(mut dst, det, proto_data, resp) => {
265 let result = std::panic::catch_unwind(AssertUnwindSafe(|| {
268 let dst = unsafe { dst.ptr.as_mut() };
269 let det =
270 unsafe { std::slice::from_raw_parts(det.ptr.as_ptr(), det.len) };
271 gl_converter.draw_masks_proto(dst, det, &proto_data)
272 }));
273 let _ = resp.send(match result {
274 Ok(res) => res,
275 Err(e) => {
276 poisoned = true;
277 Err(crate::Error::Internal(format!(
278 "GL thread panicked during DrawMasksProto: {}",
279 panic_message(e.as_ref()),
280 )))
281 }
282 });
283 }
284 GLProcessorMessage::SetColors(colors, resp) => {
285 let result = std::panic::catch_unwind(AssertUnwindSafe(|| {
286 gl_converter.set_class_colors(&colors)
287 }));
288 let _ = resp.send(match result {
289 Ok(res) => res,
290 Err(e) => {
291 poisoned = true;
292 Err(crate::Error::Internal(format!(
293 "GL thread panicked during SetColors: {}",
294 panic_message(e.as_ref()),
295 )))
296 }
297 });
298 }
299 GLProcessorMessage::SetInt8Interpolation(mode, resp) => {
300 let result = std::panic::catch_unwind(AssertUnwindSafe(|| {
301 gl_converter.set_int8_interpolation_mode(mode);
302 Ok(())
303 }));
304 let _ = resp.send(match result {
305 Ok(res) => res,
306 Err(e) => {
307 poisoned = true;
308 Err(crate::Error::Internal(format!(
309 "GL thread panicked during SetInt8Interpolation: {}",
310 panic_message(e.as_ref()),
311 )))
312 }
313 });
314 }
315 GLProcessorMessage::DecodeMasksAtlas(
316 det,
317 proto_data,
318 output_width,
319 output_height,
320 resp,
321 ) => {
322 let result = std::panic::catch_unwind(AssertUnwindSafe(|| {
323 let det =
324 unsafe { std::slice::from_raw_parts(det.ptr.as_ptr(), det.len) };
325 gl_converter.decode_masks_atlas(
326 det,
327 &proto_data,
328 output_width,
329 output_height,
330 )
331 }));
332 let _ = resp.send(match result {
333 Ok(res) => res,
334 Err(e) => {
335 poisoned = true;
336 Err(crate::Error::Internal(format!(
337 "GL thread panicked during DecodeMasksAtlas: {}",
338 panic_message(e.as_ref()),
339 )))
340 }
341 });
342 }
343 GLProcessorMessage::PboCreate(size, resp) => {
344 let result = std::panic::catch_unwind(AssertUnwindSafe(|| unsafe {
345 let mut id: u32 = 0;
346 gls::gl::GenBuffers(1, &mut id);
347 gls::gl::BindBuffer(gls::gl::PIXEL_PACK_BUFFER, id);
348 gls::gl::BufferData(
349 gls::gl::PIXEL_PACK_BUFFER,
350 size as isize,
351 std::ptr::null(),
352 gls::gl::STREAM_COPY,
353 );
354 gls::gl::BindBuffer(gls::gl::PIXEL_PACK_BUFFER, 0);
355 match check_gl_error("PboCreate", 0) {
356 Ok(()) => Ok(id),
357 Err(e) => {
358 gls::gl::DeleteBuffers(1, &id);
359 Err(e)
360 }
361 }
362 }));
363 let _ = resp.send(match result {
364 Ok(res) => res,
365 Err(e) => {
366 poisoned = true;
367 Err(crate::Error::Internal(format!(
368 "GL thread panicked during PboCreate: {}",
369 panic_message(e.as_ref()),
370 )))
371 }
372 });
373 }
374 GLProcessorMessage::PboMap(buffer_id, size, resp) => {
375 let result = std::panic::catch_unwind(AssertUnwindSafe(|| unsafe {
376 gls::gl::BindBuffer(gls::gl::PIXEL_PACK_BUFFER, buffer_id);
377 let ptr = gls::gl::MapBufferRange(
378 gls::gl::PIXEL_PACK_BUFFER,
379 0,
380 size as isize,
381 gls::gl::MAP_READ_BIT | gls::gl::MAP_WRITE_BIT,
382 );
383 gls::gl::BindBuffer(gls::gl::PIXEL_PACK_BUFFER, 0);
384 if ptr.is_null() {
385 Err(crate::Error::OpenGl(
386 "glMapBufferRange returned null".to_string(),
387 ))
388 } else {
389 Ok(edgefirst_tensor::PboMapping {
390 ptr: ptr as *mut u8,
391 size,
392 })
393 }
394 }));
395 let _ = resp.send(match result {
396 Ok(res) => res,
397 Err(e) => {
398 poisoned = true;
399 Err(crate::Error::Internal(format!(
400 "GL thread panicked during PboMap: {}",
401 panic_message(e.as_ref()),
402 )))
403 }
404 });
405 }
406 GLProcessorMessage::PboUnmap(buffer_id, resp) => {
407 let result = std::panic::catch_unwind(AssertUnwindSafe(|| unsafe {
408 gls::gl::BindBuffer(gls::gl::PIXEL_PACK_BUFFER, buffer_id);
409 let ok = gls::gl::UnmapBuffer(gls::gl::PIXEL_PACK_BUFFER);
410 gls::gl::BindBuffer(gls::gl::PIXEL_PACK_BUFFER, 0);
411 if ok == gls::gl::FALSE {
412 Err(Error::OpenGl(
413 "PBO data was corrupted during mapping".into(),
414 ))
415 } else {
416 check_gl_error("PboUnmap", 0)
417 }
418 }));
419 let _ = resp.send(match result {
420 Ok(res) => res,
421 Err(e) => {
422 poisoned = true;
423 Err(crate::Error::Internal(format!(
424 "GL thread panicked during PboUnmap: {}",
425 panic_message(e.as_ref()),
426 )))
427 }
428 });
429 }
430 GLProcessorMessage::PboDelete(buffer_id) => {
431 if let Err(e) = std::panic::catch_unwind(AssertUnwindSafe(|| unsafe {
432 gls::gl::DeleteBuffers(1, &buffer_id);
433 })) {
434 poisoned = true;
435 log::error!(
436 "GL thread panicked during PboDelete: {}",
437 panic_message(e.as_ref()),
438 );
439 }
440 }
441 }
442 }
443 };
444
445 let handle = std::thread::spawn(func);
447
448 let transfer_backend = match create_ctx_recv.blocking_recv() {
449 Ok(Err(e)) => return Err(e),
450 Err(_) => {
451 return Err(Error::Internal(
452 "GL converter error messaging closed without update".to_string(),
453 ));
454 }
455 Ok(Ok(tb)) => tb,
456 };
457
458 Ok(Self {
459 handle: Some(handle),
460 sender: Some(send),
461 transfer_backend,
462 })
463 }
464}
465
466impl ImageProcessorTrait for GLProcessorThreaded {
467 fn convert(
468 &mut self,
469 src: &TensorDyn,
470 dst: &mut TensorDyn,
471 rotation: crate::Rotation,
472 flip: Flip,
473 crop: Crop,
474 ) -> crate::Result<()> {
475 let (err_send, err_recv) = tokio::sync::oneshot::channel();
476 self.sender
477 .as_ref()
478 .ok_or_else(|| Error::Internal("GL processor is shutting down".to_string()))?
479 .blocking_send(GLProcessorMessage::ImageConvert(
480 SendablePtr {
481 ptr: NonNull::from(src),
482 len: 1,
483 },
484 SendablePtr {
485 ptr: NonNull::from(dst),
486 len: 1,
487 },
488 rotation,
489 flip,
490 crop,
491 err_send,
492 ))
493 .map_err(|_| Error::Internal("GL converter thread exited".to_string()))?;
494 err_recv.blocking_recv().map_err(|_| {
495 Error::Internal("GL converter error messaging closed without update".to_string())
496 })?
497 }
498
499 fn draw_masks(
500 &mut self,
501 dst: &mut TensorDyn,
502 detect: &[crate::DetectBox],
503 segmentation: &[crate::Segmentation],
504 ) -> crate::Result<()> {
505 let (err_send, err_recv) = tokio::sync::oneshot::channel();
506 self.sender
507 .as_ref()
508 .ok_or_else(|| Error::Internal("GL processor is shutting down".to_string()))?
509 .blocking_send(GLProcessorMessage::DrawMasks(
510 SendablePtr {
511 ptr: NonNull::from(dst),
512 len: 1,
513 },
514 SendablePtr {
515 ptr: NonNull::new(detect.as_ptr() as *mut DetectBox).unwrap(),
516 len: detect.len(),
517 },
518 SendablePtr {
519 ptr: NonNull::new(segmentation.as_ptr() as *mut Segmentation).unwrap(),
520 len: segmentation.len(),
521 },
522 err_send,
523 ))
524 .map_err(|_| Error::Internal("GL converter thread exited".to_string()))?;
525 err_recv.blocking_recv().map_err(|_| {
526 Error::Internal("GL converter error messaging closed without update".to_string())
527 })?
528 }
529
530 fn draw_masks_proto(
531 &mut self,
532 dst: &mut TensorDyn,
533 detect: &[DetectBox],
534 proto_data: &ProtoData,
535 ) -> crate::Result<()> {
536 let (err_send, err_recv) = tokio::sync::oneshot::channel();
537 self.sender
538 .as_ref()
539 .ok_or_else(|| Error::Internal("GL processor is shutting down".to_string()))?
540 .blocking_send(GLProcessorMessage::DrawMasksProto(
541 SendablePtr {
542 ptr: NonNull::from(dst),
543 len: 1,
544 },
545 SendablePtr {
546 ptr: NonNull::new(detect.as_ptr() as *mut DetectBox).unwrap(),
547 len: detect.len(),
548 },
549 Box::new(proto_data.clone()),
550 err_send,
551 ))
552 .map_err(|_| Error::Internal("GL converter thread exited".to_string()))?;
553 err_recv.blocking_recv().map_err(|_| {
554 Error::Internal("GL converter error messaging closed without update".to_string())
555 })?
556 }
557
558 fn decode_masks_atlas(
559 &mut self,
560 detect: &[DetectBox],
561 proto_data: ProtoData,
562 output_width: usize,
563 output_height: usize,
564 ) -> crate::Result<(Vec<u8>, Vec<MaskRegion>)> {
565 GLProcessorThreaded::decode_masks_atlas(
566 self,
567 detect,
568 proto_data,
569 output_width,
570 output_height,
571 )
572 }
573
574 fn set_class_colors(&mut self, colors: &[[u8; 4]]) -> Result<(), crate::Error> {
575 let (err_send, err_recv) = tokio::sync::oneshot::channel();
576 self.sender
577 .as_ref()
578 .ok_or_else(|| Error::Internal("GL processor is shutting down".to_string()))?
579 .blocking_send(GLProcessorMessage::SetColors(colors.to_vec(), err_send))
580 .map_err(|_| Error::Internal("GL converter thread exited".to_string()))?;
581 err_recv.blocking_recv().map_err(|_| {
582 Error::Internal("GL converter error messaging closed without update".to_string())
583 })?
584 }
585}
586
587impl GLProcessorThreaded {
588 pub fn set_int8_interpolation_mode(
590 &mut self,
591 mode: Int8InterpolationMode,
592 ) -> Result<(), crate::Error> {
593 let (err_send, err_recv) = tokio::sync::oneshot::channel();
594 self.sender
595 .as_ref()
596 .ok_or_else(|| Error::Internal("GL processor is shutting down".to_string()))?
597 .blocking_send(GLProcessorMessage::SetInt8Interpolation(mode, err_send))
598 .map_err(|_| Error::Internal("GL converter thread exited".to_string()))?;
599 err_recv.blocking_recv().map_err(|_| {
600 Error::Internal("GL converter error messaging closed without update".to_string())
601 })?
602 }
603
604 pub fn decode_masks_atlas(
610 &mut self,
611 detect: &[DetectBox],
612 proto_data: ProtoData,
613 output_width: usize,
614 output_height: usize,
615 ) -> Result<(Vec<u8>, Vec<MaskRegion>), crate::Error> {
616 let (resp_send, resp_recv) = tokio::sync::oneshot::channel();
617 self.sender
618 .as_ref()
619 .ok_or_else(|| Error::Internal("GL processor is shutting down".to_string()))?
620 .blocking_send(GLProcessorMessage::DecodeMasksAtlas(
621 SendablePtr {
622 ptr: NonNull::new(detect.as_ptr() as *mut DetectBox).unwrap(),
623 len: detect.len(),
624 },
625 Box::new(proto_data),
626 output_width,
627 output_height,
628 resp_send,
629 ))
630 .map_err(|_| Error::Internal("GL converter thread exited".to_string()))?;
631 resp_recv.blocking_recv().map_err(|_| {
632 Error::Internal("GL converter error messaging closed without update".to_string())
633 })?
634 }
635
636 pub fn create_pbo_image(
638 &self,
639 width: usize,
640 height: usize,
641 format: edgefirst_tensor::PixelFormat,
642 ) -> Result<edgefirst_tensor::Tensor<u8>, Error> {
643 let sender = self
644 .sender
645 .as_ref()
646 .ok_or(Error::OpenGl("GL processor is shutting down".to_string()))?;
647
648 let channels = format.channels();
649 let size = match format.layout() {
650 edgefirst_tensor::PixelLayout::SemiPlanar => {
651 match format {
653 edgefirst_tensor::PixelFormat::Nv12 => width * height * 3 / 2,
654 edgefirst_tensor::PixelFormat::Nv16 => width * height * 2,
655 _ => width * height * channels,
656 }
657 }
658 edgefirst_tensor::PixelLayout::Packed | edgefirst_tensor::PixelLayout::Planar => {
659 width * height * channels
660 }
661 _ => width * height * channels,
662 };
663 if size == 0 {
664 return Err(Error::OpenGl("Invalid image dimensions".to_string()));
665 }
666
667 let (tx, rx) = tokio::sync::oneshot::channel();
669 sender
670 .blocking_send(GLProcessorMessage::PboCreate(size, tx))
671 .map_err(|_| Error::OpenGl("GL thread channel closed".to_string()))?;
672 let buffer_id = rx
673 .blocking_recv()
674 .map_err(|_| Error::OpenGl("GL thread did not respond".to_string()))??;
675
676 let ops: std::sync::Arc<dyn edgefirst_tensor::PboOps> = std::sync::Arc::new(GlPboOps {
677 sender: sender.downgrade(),
678 });
679
680 let shape = match format.layout() {
681 edgefirst_tensor::PixelLayout::Planar => vec![channels, height, width],
682 edgefirst_tensor::PixelLayout::SemiPlanar => {
683 let total_h = match format {
684 edgefirst_tensor::PixelFormat::Nv12 => height * 3 / 2,
685 edgefirst_tensor::PixelFormat::Nv16 => height * 2,
686 _ => height * 2,
687 };
688 vec![total_h, width]
689 }
690 _ => vec![height, width, channels],
691 };
692
693 let pbo_tensor =
694 edgefirst_tensor::PboTensor::<u8>::from_pbo(buffer_id, size, &shape, None, ops)
695 .map_err(|e| Error::OpenGl(format!("PBO tensor creation failed: {e:?}")))?;
696 let mut tensor = edgefirst_tensor::Tensor::from_pbo(pbo_tensor);
697 tensor
698 .set_format(format)
699 .map_err(|e| Error::OpenGl(format!("Failed to set format on PBO tensor: {e:?}")))?;
700 Ok(tensor)
701 }
702
703 pub(crate) fn transfer_backend(&self) -> TransferBackend {
705 self.transfer_backend
706 }
707}
708
709impl Drop for GLProcessorThreaded {
710 fn drop(&mut self) {
711 drop(self.sender.take());
712 let _ = self.handle.take().and_then(|h| h.join().ok());
713 }
714}