1use edgefirst_decoder::{DetectBox, ProtoData, Segmentation};
5use std::panic::AssertUnwindSafe;
6use std::ptr::NonNull;
7use std::thread::JoinHandle;
8use tokio::sync::mpsc::{Sender, WeakSender};
9
10use super::processor::GLProcessorST;
11use super::shaders::check_gl_error;
12use super::{EglDisplayKind, Int8InterpolationMode, TransferBackend};
13use crate::{Crop, Error, Flip, ImageProcessorTrait, Rotation};
14use edgefirst_tensor::TensorDyn;
15
16#[allow(clippy::type_complexity)]
17enum GLProcessorMessage {
18 ImageConvert(
19 SendablePtr<TensorDyn>,
20 SendablePtr<TensorDyn>,
21 Rotation,
22 Flip,
23 Crop,
24 tokio::sync::oneshot::Sender<Result<(), Error>>,
25 ),
26 SetColors(
27 Vec<[u8; 4]>,
28 tokio::sync::oneshot::Sender<Result<(), Error>>,
29 ),
30 DrawDecodedMasks(
31 SendablePtr<TensorDyn>,
32 SendablePtr<DetectBox>,
33 SendablePtr<Segmentation>,
34 f32, Option<SendablePtr<TensorDyn>>, tokio::sync::oneshot::Sender<Result<(), Error>>,
37 ),
38 DrawProtoMasks(
39 SendablePtr<TensorDyn>,
40 SendablePtr<DetectBox>,
41 SendablePtr<ProtoData>,
42 f32, Option<SendablePtr<TensorDyn>>, tokio::sync::oneshot::Sender<Result<(), Error>>,
45 ),
46 SetInt8Interpolation(
47 Int8InterpolationMode,
48 tokio::sync::oneshot::Sender<Result<(), Error>>,
49 ),
50 PboCreate(
51 usize, tokio::sync::oneshot::Sender<Result<u32, Error>>,
53 ),
54 PboMap(
55 u32, usize, tokio::sync::oneshot::Sender<Result<edgefirst_tensor::PboMapping, Error>>,
58 ),
59 PboUnmap(
60 u32, tokio::sync::oneshot::Sender<Result<(), Error>>,
62 ),
63 PboDelete(u32), }
65
66struct GlPboOps {
73 sender: WeakSender<GLProcessorMessage>,
74}
75
76unsafe impl edgefirst_tensor::PboOps for GlPboOps {
81 fn map_buffer(
82 &self,
83 buffer_id: u32,
84 size: usize,
85 ) -> edgefirst_tensor::Result<edgefirst_tensor::PboMapping> {
86 let sender = self
87 .sender
88 .upgrade()
89 .ok_or(edgefirst_tensor::Error::PboDisconnected)?;
90 let (tx, rx) = tokio::sync::oneshot::channel();
91 sender
92 .blocking_send(GLProcessorMessage::PboMap(buffer_id, size, tx))
93 .map_err(|_| edgefirst_tensor::Error::PboDisconnected)?;
94 rx.blocking_recv()
95 .map_err(|_| edgefirst_tensor::Error::PboDisconnected)?
96 .map_err(|e| {
97 edgefirst_tensor::Error::NotImplemented(format!("GL PBO map failed: {e:?}"))
98 })
99 }
100
101 fn unmap_buffer(&self, buffer_id: u32) -> edgefirst_tensor::Result<()> {
102 let sender = self
103 .sender
104 .upgrade()
105 .ok_or(edgefirst_tensor::Error::PboDisconnected)?;
106 let (tx, rx) = tokio::sync::oneshot::channel();
107 sender
108 .blocking_send(GLProcessorMessage::PboUnmap(buffer_id, tx))
109 .map_err(|_| edgefirst_tensor::Error::PboDisconnected)?;
110 rx.blocking_recv()
111 .map_err(|_| edgefirst_tensor::Error::PboDisconnected)?
112 .map_err(|e| {
113 edgefirst_tensor::Error::NotImplemented(format!("GL PBO unmap failed: {e:?}"))
114 })
115 }
116
117 fn delete_buffer(&self, buffer_id: u32) {
118 if let Some(sender) = self.sender.upgrade() {
119 let _ = sender.blocking_send(GLProcessorMessage::PboDelete(buffer_id));
120 }
121 }
122}
123
124#[derive(Debug)]
129pub struct GLProcessorThreaded {
130 handle: Option<JoinHandle<()>>,
132
133 sender: Option<Sender<GLProcessorMessage>>,
135 transfer_backend: TransferBackend,
136}
137
138unsafe impl Send for GLProcessorThreaded {}
139unsafe impl Sync for GLProcessorThreaded {}
140
141struct SendablePtr<T: Send> {
142 ptr: NonNull<T>,
143 len: usize,
144}
145
146unsafe impl<T> Send for SendablePtr<T> where T: Send {}
147
148fn panic_message(info: &(dyn std::any::Any + Send)) -> String {
150 if let Some(s) = info.downcast_ref::<&str>() {
151 s.to_string()
152 } else if let Some(s) = info.downcast_ref::<String>() {
153 s.clone()
154 } else {
155 "unknown panic".to_string()
156 }
157}
158
159impl GLProcessorThreaded {
160 pub fn new(kind: Option<EglDisplayKind>) -> Result<Self, Error> {
162 let (send, mut recv) = tokio::sync::mpsc::channel::<GLProcessorMessage>(1);
163
164 let (create_ctx_send, create_ctx_recv) = tokio::sync::oneshot::channel();
165
166 let func = move || {
167 let mut gl_converter = match GLProcessorST::new(kind) {
168 Ok(gl) => gl,
169 Err(e) => {
170 let _ = create_ctx_send.send(Err(e));
171 return;
172 }
173 };
174 let _ = create_ctx_send.send(Ok(gl_converter.gl_context.transfer_backend));
175 let mut poisoned = false;
176 while let Some(msg) = recv.blocking_recv() {
177 if poisoned {
182 let poison_err = crate::Error::Internal(
183 "GL context is poisoned after a prior panic".to_string(),
184 );
185 match msg {
186 GLProcessorMessage::ImageConvert(.., resp) => {
187 let _ = resp.send(Err(poison_err));
188 }
189 GLProcessorMessage::DrawDecodedMasks(.., resp) => {
190 let _ = resp.send(Err(poison_err));
191 }
192 GLProcessorMessage::DrawProtoMasks(.., resp) => {
193 let _ = resp.send(Err(poison_err));
194 }
195 GLProcessorMessage::SetColors(_, resp) => {
196 let _ = resp.send(Err(poison_err));
197 }
198 GLProcessorMessage::SetInt8Interpolation(_, resp) => {
199 let _ = resp.send(Err(poison_err));
200 }
201 GLProcessorMessage::PboCreate(_, resp) => {
202 let _ = resp.send(Err(poison_err));
203 }
204 GLProcessorMessage::PboMap(_, _, resp) => {
205 let _ = resp.send(Err(poison_err));
206 }
207 GLProcessorMessage::PboUnmap(_, resp) => {
208 let _ = resp.send(Err(poison_err));
209 }
210 GLProcessorMessage::PboDelete(_) => {}
211 }
212 continue;
213 }
214
215 match msg {
216 GLProcessorMessage::ImageConvert(src, mut dst, rotation, flip, crop, resp) => {
217 let result = std::panic::catch_unwind(AssertUnwindSafe(|| {
220 let src = unsafe { src.ptr.as_ref() };
221 let dst = unsafe { dst.ptr.as_mut() };
222 gl_converter.convert(src, dst, rotation, flip, crop)
223 }));
224 let _ = resp.send(match result {
225 Ok(res) => res,
226 Err(e) => {
227 poisoned = true;
228 Err(crate::Error::Internal(format!(
229 "GL thread panicked during ImageConvert: {}",
230 panic_message(e.as_ref()),
231 )))
232 }
233 });
234 }
235 GLProcessorMessage::DrawDecodedMasks(mut dst, det, seg, opacity, bg, resp) => {
236 let result = std::panic::catch_unwind(AssertUnwindSafe(|| {
240 let dst = unsafe { dst.ptr.as_mut() };
241 let det =
242 unsafe { std::slice::from_raw_parts(det.ptr.as_ptr(), det.len) };
243 let seg =
244 unsafe { std::slice::from_raw_parts(seg.ptr.as_ptr(), seg.len) };
245 let bg_ref = bg.map(|p| unsafe { &*p.ptr.as_ptr() });
246 gl_converter.draw_decoded_masks(
247 dst,
248 det,
249 seg,
250 crate::MaskOverlay {
251 background: bg_ref,
252 opacity,
253 },
254 )
255 }));
256 let _ = resp.send(match result {
257 Ok(res) => res,
258 Err(e) => {
259 poisoned = true;
260 Err(crate::Error::Internal(format!(
261 "GL thread panicked during DrawDecodedMasks: {}",
262 panic_message(e.as_ref()),
263 )))
264 }
265 });
266 }
267 GLProcessorMessage::DrawProtoMasks(
268 mut dst,
269 det,
270 proto_data,
271 opacity,
272 bg,
273 resp,
274 ) => {
275 let result = std::panic::catch_unwind(AssertUnwindSafe(|| {
278 let dst = unsafe { dst.ptr.as_mut() };
279 let det =
280 unsafe { std::slice::from_raw_parts(det.ptr.as_ptr(), det.len) };
281 let bg_ref = bg.map(|p| unsafe { &*p.ptr.as_ptr() });
282 let proto_data = unsafe { proto_data.ptr.as_ref() };
283 gl_converter.draw_proto_masks(
284 dst,
285 det,
286 proto_data,
287 crate::MaskOverlay {
288 background: bg_ref,
289 opacity,
290 },
291 )
292 }));
293 let _ = resp.send(match result {
294 Ok(res) => res,
295 Err(e) => {
296 poisoned = true;
297 Err(crate::Error::Internal(format!(
298 "GL thread panicked during DrawProtoMasks: {}",
299 panic_message(e.as_ref()),
300 )))
301 }
302 });
303 }
304 GLProcessorMessage::SetColors(colors, resp) => {
305 let result = std::panic::catch_unwind(AssertUnwindSafe(|| {
306 gl_converter.set_class_colors(&colors)
307 }));
308 let _ = resp.send(match result {
309 Ok(res) => res,
310 Err(e) => {
311 poisoned = true;
312 Err(crate::Error::Internal(format!(
313 "GL thread panicked during SetColors: {}",
314 panic_message(e.as_ref()),
315 )))
316 }
317 });
318 }
319 GLProcessorMessage::SetInt8Interpolation(mode, resp) => {
320 let result = std::panic::catch_unwind(AssertUnwindSafe(|| {
321 gl_converter.set_int8_interpolation_mode(mode);
322 Ok(())
323 }));
324 let _ = resp.send(match result {
325 Ok(res) => res,
326 Err(e) => {
327 poisoned = true;
328 Err(crate::Error::Internal(format!(
329 "GL thread panicked during SetInt8Interpolation: {}",
330 panic_message(e.as_ref()),
331 )))
332 }
333 });
334 }
335 GLProcessorMessage::PboCreate(size, resp) => {
336 let result = std::panic::catch_unwind(AssertUnwindSafe(|| unsafe {
337 let mut id: u32 = 0;
338 gls::gl::GenBuffers(1, &mut id);
339 gls::gl::BindBuffer(gls::gl::PIXEL_PACK_BUFFER, id);
340 gls::gl::BufferData(
341 gls::gl::PIXEL_PACK_BUFFER,
342 size as isize,
343 std::ptr::null(),
344 gls::gl::STREAM_COPY,
345 );
346 gls::gl::BindBuffer(gls::gl::PIXEL_PACK_BUFFER, 0);
347 match check_gl_error("PboCreate", 0) {
348 Ok(()) => Ok(id),
349 Err(e) => {
350 gls::gl::DeleteBuffers(1, &id);
351 Err(e)
352 }
353 }
354 }));
355 let _ = resp.send(match result {
356 Ok(res) => res,
357 Err(e) => {
358 poisoned = true;
359 Err(crate::Error::Internal(format!(
360 "GL thread panicked during PboCreate: {}",
361 panic_message(e.as_ref()),
362 )))
363 }
364 });
365 }
366 GLProcessorMessage::PboMap(buffer_id, size, resp) => {
367 let result = std::panic::catch_unwind(AssertUnwindSafe(|| unsafe {
368 gls::gl::BindBuffer(gls::gl::PIXEL_PACK_BUFFER, buffer_id);
369 let ptr = gls::gl::MapBufferRange(
370 gls::gl::PIXEL_PACK_BUFFER,
371 0,
372 size as isize,
373 gls::gl::MAP_READ_BIT | gls::gl::MAP_WRITE_BIT,
374 );
375 gls::gl::BindBuffer(gls::gl::PIXEL_PACK_BUFFER, 0);
376 if ptr.is_null() {
377 Err(crate::Error::OpenGl(
378 "glMapBufferRange returned null".to_string(),
379 ))
380 } else {
381 Ok(edgefirst_tensor::PboMapping {
382 ptr: ptr as *mut u8,
383 size,
384 })
385 }
386 }));
387 let _ = resp.send(match result {
388 Ok(res) => res,
389 Err(e) => {
390 poisoned = true;
391 Err(crate::Error::Internal(format!(
392 "GL thread panicked during PboMap: {}",
393 panic_message(e.as_ref()),
394 )))
395 }
396 });
397 }
398 GLProcessorMessage::PboUnmap(buffer_id, resp) => {
399 let result = std::panic::catch_unwind(AssertUnwindSafe(|| unsafe {
400 gls::gl::BindBuffer(gls::gl::PIXEL_PACK_BUFFER, buffer_id);
401 let ok = gls::gl::UnmapBuffer(gls::gl::PIXEL_PACK_BUFFER);
402 gls::gl::BindBuffer(gls::gl::PIXEL_PACK_BUFFER, 0);
403 if ok == gls::gl::FALSE {
404 Err(Error::OpenGl(
405 "PBO data was corrupted during mapping".into(),
406 ))
407 } else {
408 check_gl_error("PboUnmap", 0)
409 }
410 }));
411 let _ = resp.send(match result {
412 Ok(res) => res,
413 Err(e) => {
414 poisoned = true;
415 Err(crate::Error::Internal(format!(
416 "GL thread panicked during PboUnmap: {}",
417 panic_message(e.as_ref()),
418 )))
419 }
420 });
421 }
422 GLProcessorMessage::PboDelete(buffer_id) => {
423 if let Err(e) = std::panic::catch_unwind(AssertUnwindSafe(|| unsafe {
424 gls::gl::DeleteBuffers(1, &buffer_id);
425 })) {
426 poisoned = true;
427 log::error!(
428 "GL thread panicked during PboDelete: {}",
429 panic_message(e.as_ref()),
430 );
431 }
432 }
433 }
434 }
435 };
436
437 let handle = std::thread::spawn(func);
439
440 let transfer_backend = match create_ctx_recv.blocking_recv() {
441 Ok(Err(e)) => return Err(e),
442 Err(_) => {
443 return Err(Error::Internal(
444 "GL converter error messaging closed without update".to_string(),
445 ));
446 }
447 Ok(Ok(tb)) => tb,
448 };
449
450 Ok(Self {
451 handle: Some(handle),
452 sender: Some(send),
453 transfer_backend,
454 })
455 }
456}
457
458impl ImageProcessorTrait for GLProcessorThreaded {
459 fn convert(
460 &mut self,
461 src: &TensorDyn,
462 dst: &mut TensorDyn,
463 rotation: crate::Rotation,
464 flip: Flip,
465 crop: Crop,
466 ) -> crate::Result<()> {
467 let (err_send, err_recv) = tokio::sync::oneshot::channel();
468 self.sender
469 .as_ref()
470 .ok_or_else(|| Error::Internal("GL processor is shutting down".to_string()))?
471 .blocking_send(GLProcessorMessage::ImageConvert(
472 SendablePtr {
473 ptr: NonNull::from(src),
474 len: 1,
475 },
476 SendablePtr {
477 ptr: NonNull::from(dst),
478 len: 1,
479 },
480 rotation,
481 flip,
482 crop,
483 err_send,
484 ))
485 .map_err(|_| Error::Internal("GL converter thread exited".to_string()))?;
486 err_recv.blocking_recv().map_err(|_| {
487 Error::Internal("GL converter error messaging closed without update".to_string())
488 })?
489 }
490
491 fn draw_decoded_masks(
492 &mut self,
493 dst: &mut TensorDyn,
494 detect: &[crate::DetectBox],
495 segmentation: &[crate::Segmentation],
496 overlay: crate::MaskOverlay<'_>,
497 ) -> crate::Result<()> {
498 let (err_send, err_recv) = tokio::sync::oneshot::channel();
499 self.sender
500 .as_ref()
501 .ok_or_else(|| Error::Internal("GL processor is shutting down".to_string()))?
502 .blocking_send(GLProcessorMessage::DrawDecodedMasks(
503 SendablePtr {
504 ptr: NonNull::from(dst),
505 len: 1,
506 },
507 SendablePtr {
508 ptr: NonNull::new(detect.as_ptr() as *mut DetectBox).unwrap(),
509 len: detect.len(),
510 },
511 SendablePtr {
512 ptr: NonNull::new(segmentation.as_ptr() as *mut Segmentation).unwrap(),
513 len: segmentation.len(),
514 },
515 overlay.opacity,
516 overlay.background.map(|bg| SendablePtr {
517 ptr: NonNull::from(bg).cast::<TensorDyn>(),
518 len: 1,
519 }),
520 err_send,
521 ))
522 .map_err(|_| Error::Internal("GL converter thread exited".to_string()))?;
523 err_recv.blocking_recv().map_err(|_| {
524 Error::Internal("GL converter error messaging closed without update".to_string())
525 })?
526 }
527
528 fn draw_proto_masks(
529 &mut self,
530 dst: &mut TensorDyn,
531 detect: &[DetectBox],
532 proto_data: &ProtoData,
533 overlay: crate::MaskOverlay<'_>,
534 ) -> crate::Result<()> {
535 let (err_send, err_recv) = tokio::sync::oneshot::channel();
536 self.sender
537 .as_ref()
538 .ok_or_else(|| Error::Internal("GL processor is shutting down".to_string()))?
539 .blocking_send(GLProcessorMessage::DrawProtoMasks(
540 SendablePtr {
541 ptr: NonNull::from(dst),
542 len: 1,
543 },
544 SendablePtr {
545 ptr: NonNull::new(detect.as_ptr() as *mut DetectBox).unwrap(),
546 len: detect.len(),
547 },
548 SendablePtr {
549 ptr: NonNull::from(proto_data).cast::<ProtoData>(),
550 len: 1,
551 },
552 overlay.opacity,
553 overlay.background.map(|bg| SendablePtr {
554 ptr: NonNull::from(bg).cast::<TensorDyn>(),
555 len: 1,
556 }),
557 err_send,
558 ))
559 .map_err(|_| Error::Internal("GL converter thread exited".to_string()))?;
560 err_recv.blocking_recv().map_err(|_| {
561 Error::Internal("GL converter error messaging closed without update".to_string())
562 })?
563 }
564
565 fn set_class_colors(&mut self, colors: &[[u8; 4]]) -> Result<(), crate::Error> {
566 let (err_send, err_recv) = tokio::sync::oneshot::channel();
567 self.sender
568 .as_ref()
569 .ok_or_else(|| Error::Internal("GL processor is shutting down".to_string()))?
570 .blocking_send(GLProcessorMessage::SetColors(colors.to_vec(), err_send))
571 .map_err(|_| Error::Internal("GL converter thread exited".to_string()))?;
572 err_recv.blocking_recv().map_err(|_| {
573 Error::Internal("GL converter error messaging closed without update".to_string())
574 })?
575 }
576}
577
578impl GLProcessorThreaded {
579 pub fn set_int8_interpolation_mode(
581 &mut self,
582 mode: Int8InterpolationMode,
583 ) -> Result<(), crate::Error> {
584 let (err_send, err_recv) = tokio::sync::oneshot::channel();
585 self.sender
586 .as_ref()
587 .ok_or_else(|| Error::Internal("GL processor is shutting down".to_string()))?
588 .blocking_send(GLProcessorMessage::SetInt8Interpolation(mode, err_send))
589 .map_err(|_| Error::Internal("GL converter thread exited".to_string()))?;
590 err_recv.blocking_recv().map_err(|_| {
591 Error::Internal("GL converter error messaging closed without update".to_string())
592 })?
593 }
594
595 pub fn create_pbo_image(
597 &self,
598 width: usize,
599 height: usize,
600 format: edgefirst_tensor::PixelFormat,
601 ) -> Result<edgefirst_tensor::Tensor<u8>, Error> {
602 let sender = self
603 .sender
604 .as_ref()
605 .ok_or(Error::OpenGl("GL processor is shutting down".to_string()))?;
606
607 let channels = format.channels();
608 let size = match format.layout() {
609 edgefirst_tensor::PixelLayout::SemiPlanar => {
610 match format {
612 edgefirst_tensor::PixelFormat::Nv12 => width * height * 3 / 2,
613 edgefirst_tensor::PixelFormat::Nv16 => width * height * 2,
614 _ => width * height * channels,
615 }
616 }
617 edgefirst_tensor::PixelLayout::Packed | edgefirst_tensor::PixelLayout::Planar => {
618 width * height * channels
619 }
620 _ => width * height * channels,
621 };
622 if size == 0 {
623 return Err(Error::OpenGl("Invalid image dimensions".to_string()));
624 }
625
626 let (tx, rx) = tokio::sync::oneshot::channel();
628 sender
629 .blocking_send(GLProcessorMessage::PboCreate(size, tx))
630 .map_err(|_| Error::OpenGl("GL thread channel closed".to_string()))?;
631 let buffer_id = rx
632 .blocking_recv()
633 .map_err(|_| Error::OpenGl("GL thread did not respond".to_string()))??;
634
635 let ops: std::sync::Arc<dyn edgefirst_tensor::PboOps> = std::sync::Arc::new(GlPboOps {
636 sender: sender.downgrade(),
637 });
638
639 let shape = match format.layout() {
640 edgefirst_tensor::PixelLayout::Planar => vec![channels, height, width],
641 edgefirst_tensor::PixelLayout::SemiPlanar => {
642 let total_h = match format {
643 edgefirst_tensor::PixelFormat::Nv12 => height * 3 / 2,
644 edgefirst_tensor::PixelFormat::Nv16 => height * 2,
645 _ => height * 2,
646 };
647 vec![total_h, width]
648 }
649 _ => vec![height, width, channels],
650 };
651
652 let pbo_tensor =
653 edgefirst_tensor::PboTensor::<u8>::from_pbo(buffer_id, size, &shape, None, ops)
654 .map_err(|e| Error::OpenGl(format!("PBO tensor creation failed: {e:?}")))?;
655 let mut tensor = edgefirst_tensor::Tensor::from_pbo(pbo_tensor);
656 tensor
657 .set_format(format)
658 .map_err(|e| Error::OpenGl(format!("Failed to set format on PBO tensor: {e:?}")))?;
659 Ok(tensor)
660 }
661
662 pub(crate) fn transfer_backend(&self) -> TransferBackend {
664 self.transfer_backend
665 }
666}
667
668impl Drop for GLProcessorThreaded {
669 fn drop(&mut self) {
670 drop(self.sender.take());
671 let _ = self.handle.take().and_then(|h| h.join().ok());
672 }
673}