Skip to main content

edgefirst_image/gl/
threaded.rs

1// SPDX-FileCopyrightText: Copyright 2025 Au-Zone Technologies
2// SPDX-License-Identifier: Apache-2.0
3
4use edgefirst_decoder::{DetectBox, ProtoData, Segmentation};
5use std::panic::AssertUnwindSafe;
6use std::ptr::NonNull;
7use std::thread::JoinHandle;
8use tokio::sync::mpsc::{Sender, WeakSender};
9
10use super::processor::GLProcessorST;
11use super::shaders::check_gl_error;
12use super::{EglDisplayKind, Int8InterpolationMode, TransferBackend};
13use crate::{Crop, Error, Flip, ImageProcessorTrait, MaskRegion, Rotation};
14use edgefirst_tensor::TensorDyn;
15
16#[allow(clippy::type_complexity)]
17enum GLProcessorMessage {
18    ImageConvert(
19        SendablePtr<TensorDyn>,
20        SendablePtr<TensorDyn>,
21        Rotation,
22        Flip,
23        Crop,
24        tokio::sync::oneshot::Sender<Result<(), Error>>,
25    ),
26    SetColors(
27        Vec<[u8; 4]>,
28        tokio::sync::oneshot::Sender<Result<(), Error>>,
29    ),
30    DrawMasks(
31        SendablePtr<TensorDyn>,
32        SendablePtr<DetectBox>,
33        SendablePtr<Segmentation>,
34        tokio::sync::oneshot::Sender<Result<(), Error>>,
35    ),
36    DrawMasksProto(
37        SendablePtr<TensorDyn>,
38        SendablePtr<DetectBox>,
39        Box<ProtoData>,
40        tokio::sync::oneshot::Sender<Result<(), Error>>,
41    ),
42    SetInt8Interpolation(
43        Int8InterpolationMode,
44        tokio::sync::oneshot::Sender<Result<(), Error>>,
45    ),
46    DecodeMasksAtlas(
47        SendablePtr<DetectBox>,
48        Box<ProtoData>,
49        usize, // output_width
50        usize, // output_height
51        tokio::sync::oneshot::Sender<Result<(Vec<u8>, Vec<MaskRegion>), Error>>,
52    ),
53    PboCreate(
54        usize, // buffer size in bytes
55        tokio::sync::oneshot::Sender<Result<u32, Error>>,
56    ),
57    PboMap(
58        u32,   // buffer_id
59        usize, // size
60        tokio::sync::oneshot::Sender<Result<edgefirst_tensor::PboMapping, Error>>,
61    ),
62    PboUnmap(
63        u32, // buffer_id
64        tokio::sync::oneshot::Sender<Result<(), Error>>,
65    ),
66    PboDelete(u32), // fire-and-forget, no reply
67}
68
69/// Implements PboOps by sending commands to the GL thread.
70///
71/// Uses a `WeakSender` so that PBO images don't keep the GL thread's channel
72/// alive. When the `GLProcessorThreaded` is dropped, its `Sender` is the last
73/// strong reference — dropping it closes the channel and lets the GL thread
74/// exit. PBO operations after that return `PboDisconnected`.
75struct GlPboOps {
76    sender: WeakSender<GLProcessorMessage>,
77}
78
79// SAFETY: GlPboOps sends all GL operations to the dedicated GL thread via a
80// channel. `map_buffer` returns a CPU-visible pointer from `glMapBufferRange`
81// that remains valid until `unmap_buffer` calls `glUnmapBuffer` on the GL thread.
82// `delete_buffer` sends a fire-and-forget deletion command to the GL thread.
83unsafe impl edgefirst_tensor::PboOps for GlPboOps {
84    fn map_buffer(
85        &self,
86        buffer_id: u32,
87        size: usize,
88    ) -> edgefirst_tensor::Result<edgefirst_tensor::PboMapping> {
89        let sender = self
90            .sender
91            .upgrade()
92            .ok_or(edgefirst_tensor::Error::PboDisconnected)?;
93        let (tx, rx) = tokio::sync::oneshot::channel();
94        sender
95            .blocking_send(GLProcessorMessage::PboMap(buffer_id, size, tx))
96            .map_err(|_| edgefirst_tensor::Error::PboDisconnected)?;
97        rx.blocking_recv()
98            .map_err(|_| edgefirst_tensor::Error::PboDisconnected)?
99            .map_err(|e| {
100                edgefirst_tensor::Error::NotImplemented(format!("GL PBO map failed: {e:?}"))
101            })
102    }
103
104    fn unmap_buffer(&self, buffer_id: u32) -> edgefirst_tensor::Result<()> {
105        let sender = self
106            .sender
107            .upgrade()
108            .ok_or(edgefirst_tensor::Error::PboDisconnected)?;
109        let (tx, rx) = tokio::sync::oneshot::channel();
110        sender
111            .blocking_send(GLProcessorMessage::PboUnmap(buffer_id, tx))
112            .map_err(|_| edgefirst_tensor::Error::PboDisconnected)?;
113        rx.blocking_recv()
114            .map_err(|_| edgefirst_tensor::Error::PboDisconnected)?
115            .map_err(|e| {
116                edgefirst_tensor::Error::NotImplemented(format!("GL PBO unmap failed: {e:?}"))
117            })
118    }
119
120    fn delete_buffer(&self, buffer_id: u32) {
121        if let Some(sender) = self.sender.upgrade() {
122            let _ = sender.blocking_send(GLProcessorMessage::PboDelete(buffer_id));
123        }
124    }
125}
126
127/// OpenGL multi-threaded image converter. The actual conversion is done in a
128/// separate rendering thread, as OpenGL contexts are not thread-safe. This can
129/// be safely sent between threads. The `convert()` call sends the conversion
130/// request to the rendering thread and waits for the result.
131#[derive(Debug)]
132pub struct GLProcessorThreaded {
133    // This is only None when the converter is being dropped.
134    handle: Option<JoinHandle<()>>,
135
136    // This is only None when the converter is being dropped.
137    sender: Option<Sender<GLProcessorMessage>>,
138    transfer_backend: TransferBackend,
139}
140
141unsafe impl Send for GLProcessorThreaded {}
142unsafe impl Sync for GLProcessorThreaded {}
143
144struct SendablePtr<T: Send> {
145    ptr: NonNull<T>,
146    len: usize,
147}
148
149unsafe impl<T> Send for SendablePtr<T> where T: Send {}
150
151/// Extract a human-readable message from a `catch_unwind` panic payload.
152fn panic_message(info: &(dyn std::any::Any + Send)) -> String {
153    if let Some(s) = info.downcast_ref::<&str>() {
154        s.to_string()
155    } else if let Some(s) = info.downcast_ref::<String>() {
156        s.clone()
157    } else {
158        "unknown panic".to_string()
159    }
160}
161
162impl GLProcessorThreaded {
163    /// Creates a new OpenGL multi-threaded image converter.
164    pub fn new(kind: Option<EglDisplayKind>) -> Result<Self, Error> {
165        let (send, mut recv) = tokio::sync::mpsc::channel::<GLProcessorMessage>(1);
166
167        let (create_ctx_send, create_ctx_recv) = tokio::sync::oneshot::channel();
168
169        let func = move || {
170            let mut gl_converter = match GLProcessorST::new(kind) {
171                Ok(gl) => gl,
172                Err(e) => {
173                    let _ = create_ctx_send.send(Err(e));
174                    return;
175                }
176            };
177            let _ = create_ctx_send.send(Ok(gl_converter.gl_context.transfer_backend));
178            let mut poisoned = false;
179            while let Some(msg) = recv.blocking_recv() {
180                // After a panic, the GL context is in an undefined state. Reject
181                // all subsequent messages with an error rather than risking wrong
182                // output or a GPU hang from corrupted GL state. This follows the
183                // same pattern as std::sync::Mutex poisoning.
184                if poisoned {
185                    let poison_err = crate::Error::Internal(
186                        "GL context is poisoned after a prior panic".to_string(),
187                    );
188                    match msg {
189                        GLProcessorMessage::ImageConvert(.., resp) => {
190                            let _ = resp.send(Err(poison_err));
191                        }
192                        GLProcessorMessage::DrawMasks(.., resp) => {
193                            let _ = resp.send(Err(poison_err));
194                        }
195                        GLProcessorMessage::DrawMasksProto(.., resp) => {
196                            let _ = resp.send(Err(poison_err));
197                        }
198                        GLProcessorMessage::SetColors(_, resp) => {
199                            let _ = resp.send(Err(poison_err));
200                        }
201                        GLProcessorMessage::SetInt8Interpolation(_, resp) => {
202                            let _ = resp.send(Err(poison_err));
203                        }
204                        GLProcessorMessage::DecodeMasksAtlas(.., resp) => {
205                            let _ = resp.send(Err(poison_err));
206                        }
207                        GLProcessorMessage::PboCreate(_, resp) => {
208                            let _ = resp.send(Err(poison_err));
209                        }
210                        GLProcessorMessage::PboMap(_, _, resp) => {
211                            let _ = resp.send(Err(poison_err));
212                        }
213                        GLProcessorMessage::PboUnmap(_, resp) => {
214                            let _ = resp.send(Err(poison_err));
215                        }
216                        GLProcessorMessage::PboDelete(_) => {}
217                    }
218                    continue;
219                }
220
221                match msg {
222                    GLProcessorMessage::ImageConvert(src, mut dst, rotation, flip, crop, resp) => {
223                        // SAFETY: This is safe because the convert() function waits for the resp to
224                        // be sent before dropping the borrow for src and dst
225                        let result = std::panic::catch_unwind(AssertUnwindSafe(|| {
226                            let src = unsafe { src.ptr.as_ref() };
227                            let dst = unsafe { dst.ptr.as_mut() };
228                            gl_converter.convert(src, dst, rotation, flip, crop)
229                        }));
230                        let _ = resp.send(match result {
231                            Ok(res) => res,
232                            Err(e) => {
233                                poisoned = true;
234                                Err(crate::Error::Internal(format!(
235                                    "GL thread panicked during ImageConvert: {}",
236                                    panic_message(e.as_ref()),
237                                )))
238                            }
239                        });
240                    }
241                    GLProcessorMessage::DrawMasks(mut dst, det, seg, resp) => {
242                        // SAFETY: This is safe because the draw_masks() function waits for the
243                        // resp to be sent before dropping the borrow for dst, detect, and
244                        // segmentation
245                        let result = std::panic::catch_unwind(AssertUnwindSafe(|| {
246                            let dst = unsafe { dst.ptr.as_mut() };
247                            let det =
248                                unsafe { std::slice::from_raw_parts(det.ptr.as_ptr(), det.len) };
249                            let seg =
250                                unsafe { std::slice::from_raw_parts(seg.ptr.as_ptr(), seg.len) };
251                            gl_converter.draw_masks(dst, det, seg)
252                        }));
253                        let _ = resp.send(match result {
254                            Ok(res) => res,
255                            Err(e) => {
256                                poisoned = true;
257                                Err(crate::Error::Internal(format!(
258                                    "GL thread panicked during DrawMasks: {}",
259                                    panic_message(e.as_ref()),
260                                )))
261                            }
262                        });
263                    }
264                    GLProcessorMessage::DrawMasksProto(mut dst, det, proto_data, resp) => {
265                        // SAFETY: Same safety invariant as DrawMasks — caller
266                        // blocks on resp before dropping borrows.
267                        let result = std::panic::catch_unwind(AssertUnwindSafe(|| {
268                            let dst = unsafe { dst.ptr.as_mut() };
269                            let det =
270                                unsafe { std::slice::from_raw_parts(det.ptr.as_ptr(), det.len) };
271                            gl_converter.draw_masks_proto(dst, det, &proto_data)
272                        }));
273                        let _ = resp.send(match result {
274                            Ok(res) => res,
275                            Err(e) => {
276                                poisoned = true;
277                                Err(crate::Error::Internal(format!(
278                                    "GL thread panicked during DrawMasksProto: {}",
279                                    panic_message(e.as_ref()),
280                                )))
281                            }
282                        });
283                    }
284                    GLProcessorMessage::SetColors(colors, resp) => {
285                        let result = std::panic::catch_unwind(AssertUnwindSafe(|| {
286                            gl_converter.set_class_colors(&colors)
287                        }));
288                        let _ = resp.send(match result {
289                            Ok(res) => res,
290                            Err(e) => {
291                                poisoned = true;
292                                Err(crate::Error::Internal(format!(
293                                    "GL thread panicked during SetColors: {}",
294                                    panic_message(e.as_ref()),
295                                )))
296                            }
297                        });
298                    }
299                    GLProcessorMessage::SetInt8Interpolation(mode, resp) => {
300                        let result = std::panic::catch_unwind(AssertUnwindSafe(|| {
301                            gl_converter.set_int8_interpolation_mode(mode);
302                            Ok(())
303                        }));
304                        let _ = resp.send(match result {
305                            Ok(res) => res,
306                            Err(e) => {
307                                poisoned = true;
308                                Err(crate::Error::Internal(format!(
309                                    "GL thread panicked during SetInt8Interpolation: {}",
310                                    panic_message(e.as_ref()),
311                                )))
312                            }
313                        });
314                    }
315                    GLProcessorMessage::DecodeMasksAtlas(
316                        det,
317                        proto_data,
318                        output_width,
319                        output_height,
320                        resp,
321                    ) => {
322                        let result = std::panic::catch_unwind(AssertUnwindSafe(|| {
323                            let det =
324                                unsafe { std::slice::from_raw_parts(det.ptr.as_ptr(), det.len) };
325                            gl_converter.decode_masks_atlas(
326                                det,
327                                &proto_data,
328                                output_width,
329                                output_height,
330                            )
331                        }));
332                        let _ = resp.send(match result {
333                            Ok(res) => res,
334                            Err(e) => {
335                                poisoned = true;
336                                Err(crate::Error::Internal(format!(
337                                    "GL thread panicked during DecodeMasksAtlas: {}",
338                                    panic_message(e.as_ref()),
339                                )))
340                            }
341                        });
342                    }
343                    GLProcessorMessage::PboCreate(size, resp) => {
344                        let result = std::panic::catch_unwind(AssertUnwindSafe(|| unsafe {
345                            let mut id: u32 = 0;
346                            gls::gl::GenBuffers(1, &mut id);
347                            gls::gl::BindBuffer(gls::gl::PIXEL_PACK_BUFFER, id);
348                            gls::gl::BufferData(
349                                gls::gl::PIXEL_PACK_BUFFER,
350                                size as isize,
351                                std::ptr::null(),
352                                gls::gl::STREAM_COPY,
353                            );
354                            gls::gl::BindBuffer(gls::gl::PIXEL_PACK_BUFFER, 0);
355                            match check_gl_error("PboCreate", 0) {
356                                Ok(()) => Ok(id),
357                                Err(e) => {
358                                    gls::gl::DeleteBuffers(1, &id);
359                                    Err(e)
360                                }
361                            }
362                        }));
363                        let _ = resp.send(match result {
364                            Ok(res) => res,
365                            Err(e) => {
366                                poisoned = true;
367                                Err(crate::Error::Internal(format!(
368                                    "GL thread panicked during PboCreate: {}",
369                                    panic_message(e.as_ref()),
370                                )))
371                            }
372                        });
373                    }
374                    GLProcessorMessage::PboMap(buffer_id, size, resp) => {
375                        let result = std::panic::catch_unwind(AssertUnwindSafe(|| unsafe {
376                            gls::gl::BindBuffer(gls::gl::PIXEL_PACK_BUFFER, buffer_id);
377                            let ptr = gls::gl::MapBufferRange(
378                                gls::gl::PIXEL_PACK_BUFFER,
379                                0,
380                                size as isize,
381                                gls::gl::MAP_READ_BIT | gls::gl::MAP_WRITE_BIT,
382                            );
383                            gls::gl::BindBuffer(gls::gl::PIXEL_PACK_BUFFER, 0);
384                            if ptr.is_null() {
385                                Err(crate::Error::OpenGl(
386                                    "glMapBufferRange returned null".to_string(),
387                                ))
388                            } else {
389                                Ok(edgefirst_tensor::PboMapping {
390                                    ptr: ptr as *mut u8,
391                                    size,
392                                })
393                            }
394                        }));
395                        let _ = resp.send(match result {
396                            Ok(res) => res,
397                            Err(e) => {
398                                poisoned = true;
399                                Err(crate::Error::Internal(format!(
400                                    "GL thread panicked during PboMap: {}",
401                                    panic_message(e.as_ref()),
402                                )))
403                            }
404                        });
405                    }
406                    GLProcessorMessage::PboUnmap(buffer_id, resp) => {
407                        let result = std::panic::catch_unwind(AssertUnwindSafe(|| unsafe {
408                            gls::gl::BindBuffer(gls::gl::PIXEL_PACK_BUFFER, buffer_id);
409                            let ok = gls::gl::UnmapBuffer(gls::gl::PIXEL_PACK_BUFFER);
410                            gls::gl::BindBuffer(gls::gl::PIXEL_PACK_BUFFER, 0);
411                            if ok == gls::gl::FALSE {
412                                Err(Error::OpenGl(
413                                    "PBO data was corrupted during mapping".into(),
414                                ))
415                            } else {
416                                check_gl_error("PboUnmap", 0)
417                            }
418                        }));
419                        let _ = resp.send(match result {
420                            Ok(res) => res,
421                            Err(e) => {
422                                poisoned = true;
423                                Err(crate::Error::Internal(format!(
424                                    "GL thread panicked during PboUnmap: {}",
425                                    panic_message(e.as_ref()),
426                                )))
427                            }
428                        });
429                    }
430                    GLProcessorMessage::PboDelete(buffer_id) => {
431                        if let Err(e) = std::panic::catch_unwind(AssertUnwindSafe(|| unsafe {
432                            gls::gl::DeleteBuffers(1, &buffer_id);
433                        })) {
434                            poisoned = true;
435                            log::error!(
436                                "GL thread panicked during PboDelete: {}",
437                                panic_message(e.as_ref()),
438                            );
439                        }
440                    }
441                }
442            }
443        };
444
445        // let handle = tokio::task::spawn(func());
446        let handle = std::thread::spawn(func);
447
448        let transfer_backend = match create_ctx_recv.blocking_recv() {
449            Ok(Err(e)) => return Err(e),
450            Err(_) => {
451                return Err(Error::Internal(
452                    "GL converter error messaging closed without update".to_string(),
453                ));
454            }
455            Ok(Ok(tb)) => tb,
456        };
457
458        Ok(Self {
459            handle: Some(handle),
460            sender: Some(send),
461            transfer_backend,
462        })
463    }
464}
465
466impl ImageProcessorTrait for GLProcessorThreaded {
467    fn convert(
468        &mut self,
469        src: &TensorDyn,
470        dst: &mut TensorDyn,
471        rotation: crate::Rotation,
472        flip: Flip,
473        crop: Crop,
474    ) -> crate::Result<()> {
475        let (err_send, err_recv) = tokio::sync::oneshot::channel();
476        self.sender
477            .as_ref()
478            .ok_or_else(|| Error::Internal("GL processor is shutting down".to_string()))?
479            .blocking_send(GLProcessorMessage::ImageConvert(
480                SendablePtr {
481                    ptr: NonNull::from(src),
482                    len: 1,
483                },
484                SendablePtr {
485                    ptr: NonNull::from(dst),
486                    len: 1,
487                },
488                rotation,
489                flip,
490                crop,
491                err_send,
492            ))
493            .map_err(|_| Error::Internal("GL converter thread exited".to_string()))?;
494        err_recv.blocking_recv().map_err(|_| {
495            Error::Internal("GL converter error messaging closed without update".to_string())
496        })?
497    }
498
499    fn draw_masks(
500        &mut self,
501        dst: &mut TensorDyn,
502        detect: &[crate::DetectBox],
503        segmentation: &[crate::Segmentation],
504    ) -> crate::Result<()> {
505        let (err_send, err_recv) = tokio::sync::oneshot::channel();
506        self.sender
507            .as_ref()
508            .ok_or_else(|| Error::Internal("GL processor is shutting down".to_string()))?
509            .blocking_send(GLProcessorMessage::DrawMasks(
510                SendablePtr {
511                    ptr: NonNull::from(dst),
512                    len: 1,
513                },
514                SendablePtr {
515                    ptr: NonNull::new(detect.as_ptr() as *mut DetectBox).unwrap(),
516                    len: detect.len(),
517                },
518                SendablePtr {
519                    ptr: NonNull::new(segmentation.as_ptr() as *mut Segmentation).unwrap(),
520                    len: segmentation.len(),
521                },
522                err_send,
523            ))
524            .map_err(|_| Error::Internal("GL converter thread exited".to_string()))?;
525        err_recv.blocking_recv().map_err(|_| {
526            Error::Internal("GL converter error messaging closed without update".to_string())
527        })?
528    }
529
530    fn draw_masks_proto(
531        &mut self,
532        dst: &mut TensorDyn,
533        detect: &[DetectBox],
534        proto_data: &ProtoData,
535    ) -> crate::Result<()> {
536        let (err_send, err_recv) = tokio::sync::oneshot::channel();
537        self.sender
538            .as_ref()
539            .ok_or_else(|| Error::Internal("GL processor is shutting down".to_string()))?
540            .blocking_send(GLProcessorMessage::DrawMasksProto(
541                SendablePtr {
542                    ptr: NonNull::from(dst),
543                    len: 1,
544                },
545                SendablePtr {
546                    ptr: NonNull::new(detect.as_ptr() as *mut DetectBox).unwrap(),
547                    len: detect.len(),
548                },
549                Box::new(proto_data.clone()),
550                err_send,
551            ))
552            .map_err(|_| Error::Internal("GL converter thread exited".to_string()))?;
553        err_recv.blocking_recv().map_err(|_| {
554            Error::Internal("GL converter error messaging closed without update".to_string())
555        })?
556    }
557
558    fn decode_masks_atlas(
559        &mut self,
560        detect: &[DetectBox],
561        proto_data: ProtoData,
562        output_width: usize,
563        output_height: usize,
564    ) -> crate::Result<(Vec<u8>, Vec<MaskRegion>)> {
565        GLProcessorThreaded::decode_masks_atlas(
566            self,
567            detect,
568            proto_data,
569            output_width,
570            output_height,
571        )
572    }
573
574    fn set_class_colors(&mut self, colors: &[[u8; 4]]) -> Result<(), crate::Error> {
575        let (err_send, err_recv) = tokio::sync::oneshot::channel();
576        self.sender
577            .as_ref()
578            .ok_or_else(|| Error::Internal("GL processor is shutting down".to_string()))?
579            .blocking_send(GLProcessorMessage::SetColors(colors.to_vec(), err_send))
580            .map_err(|_| Error::Internal("GL converter thread exited".to_string()))?;
581        err_recv.blocking_recv().map_err(|_| {
582            Error::Internal("GL converter error messaging closed without update".to_string())
583        })?
584    }
585}
586
587impl GLProcessorThreaded {
588    /// Sets the interpolation mode for int8 proto textures.
589    pub fn set_int8_interpolation_mode(
590        &mut self,
591        mode: Int8InterpolationMode,
592    ) -> Result<(), crate::Error> {
593        let (err_send, err_recv) = tokio::sync::oneshot::channel();
594        self.sender
595            .as_ref()
596            .ok_or_else(|| Error::Internal("GL processor is shutting down".to_string()))?
597            .blocking_send(GLProcessorMessage::SetInt8Interpolation(mode, err_send))
598            .map_err(|_| Error::Internal("GL converter thread exited".to_string()))?;
599        err_recv.blocking_recv().map_err(|_| {
600            Error::Internal("GL converter error messaging closed without update".to_string())
601        })?
602    }
603
604    /// Decode all detection masks into a compact atlas via the GL thread.
605    ///
606    /// Returns `(atlas_pixels, regions)` where `atlas_pixels` is a contiguous
607    /// `Vec<u8>` of shape `[atlas_h, output_width]` (compact, bbox-sized strips)
608    /// and `regions` describes each detection's location within the atlas.
609    pub fn decode_masks_atlas(
610        &mut self,
611        detect: &[DetectBox],
612        proto_data: ProtoData,
613        output_width: usize,
614        output_height: usize,
615    ) -> Result<(Vec<u8>, Vec<MaskRegion>), crate::Error> {
616        let (resp_send, resp_recv) = tokio::sync::oneshot::channel();
617        self.sender
618            .as_ref()
619            .ok_or_else(|| Error::Internal("GL processor is shutting down".to_string()))?
620            .blocking_send(GLProcessorMessage::DecodeMasksAtlas(
621                SendablePtr {
622                    ptr: NonNull::new(detect.as_ptr() as *mut DetectBox).unwrap(),
623                    len: detect.len(),
624                },
625                Box::new(proto_data),
626                output_width,
627                output_height,
628                resp_send,
629            ))
630            .map_err(|_| Error::Internal("GL converter thread exited".to_string()))?;
631        resp_recv.blocking_recv().map_err(|_| {
632            Error::Internal("GL converter error messaging closed without update".to_string())
633        })?
634    }
635
636    /// Create a PBO-backed [`Tensor<u8>`] image on the GL thread.
637    pub fn create_pbo_image(
638        &self,
639        width: usize,
640        height: usize,
641        format: edgefirst_tensor::PixelFormat,
642    ) -> Result<edgefirst_tensor::Tensor<u8>, Error> {
643        let sender = self
644            .sender
645            .as_ref()
646            .ok_or(Error::OpenGl("GL processor is shutting down".to_string()))?;
647
648        let channels = format.channels();
649        let size = match format.layout() {
650            edgefirst_tensor::PixelLayout::SemiPlanar => {
651                // NV12: W*H*3/2, NV16: W*H*2
652                match format {
653                    edgefirst_tensor::PixelFormat::Nv12 => width * height * 3 / 2,
654                    edgefirst_tensor::PixelFormat::Nv16 => width * height * 2,
655                    _ => width * height * channels,
656                }
657            }
658            edgefirst_tensor::PixelLayout::Packed | edgefirst_tensor::PixelLayout::Planar => {
659                width * height * channels
660            }
661            _ => width * height * channels,
662        };
663        if size == 0 {
664            return Err(Error::OpenGl("Invalid image dimensions".to_string()));
665        }
666
667        // Allocate PBO on the GL thread
668        let (tx, rx) = tokio::sync::oneshot::channel();
669        sender
670            .blocking_send(GLProcessorMessage::PboCreate(size, tx))
671            .map_err(|_| Error::OpenGl("GL thread channel closed".to_string()))?;
672        let buffer_id = rx
673            .blocking_recv()
674            .map_err(|_| Error::OpenGl("GL thread did not respond".to_string()))??;
675
676        let ops: std::sync::Arc<dyn edgefirst_tensor::PboOps> = std::sync::Arc::new(GlPboOps {
677            sender: sender.downgrade(),
678        });
679
680        let shape = match format.layout() {
681            edgefirst_tensor::PixelLayout::Planar => vec![channels, height, width],
682            edgefirst_tensor::PixelLayout::SemiPlanar => {
683                let total_h = match format {
684                    edgefirst_tensor::PixelFormat::Nv12 => height * 3 / 2,
685                    edgefirst_tensor::PixelFormat::Nv16 => height * 2,
686                    _ => height * 2,
687                };
688                vec![total_h, width]
689            }
690            _ => vec![height, width, channels],
691        };
692
693        let pbo_tensor =
694            edgefirst_tensor::PboTensor::<u8>::from_pbo(buffer_id, size, &shape, None, ops)
695                .map_err(|e| Error::OpenGl(format!("PBO tensor creation failed: {e:?}")))?;
696        let mut tensor = edgefirst_tensor::Tensor::from_pbo(pbo_tensor);
697        tensor
698            .set_format(format)
699            .map_err(|e| Error::OpenGl(format!("Failed to set format on PBO tensor: {e:?}")))?;
700        Ok(tensor)
701    }
702
703    /// Returns the active transfer backend.
704    pub(crate) fn transfer_backend(&self) -> TransferBackend {
705        self.transfer_backend
706    }
707}
708
709impl Drop for GLProcessorThreaded {
710    fn drop(&mut self) {
711        drop(self.sender.take());
712        let _ = self.handle.take().and_then(|h| h.join().ok());
713    }
714}