Skip to main content

edgefirst_image/gl/
threaded.rs

1// SPDX-FileCopyrightText: Copyright 2025 Au-Zone Technologies
2// SPDX-License-Identifier: Apache-2.0
3
4use edgefirst_decoder::{DetectBox, ProtoData, Segmentation};
5use std::panic::AssertUnwindSafe;
6use std::ptr::NonNull;
7use std::thread::JoinHandle;
8use tokio::sync::mpsc::{Sender, WeakSender};
9
10use super::processor::GLProcessorST;
11use super::shaders::check_gl_error;
12use super::{EglDisplayKind, Int8InterpolationMode, TransferBackend};
13use crate::{Crop, Error, Flip, ImageProcessorTrait, Rotation};
14use edgefirst_tensor::TensorDyn;
15
16#[allow(clippy::type_complexity)]
17enum GLProcessorMessage {
18    ImageConvert(
19        SendablePtr<TensorDyn>,
20        SendablePtr<TensorDyn>,
21        Rotation,
22        Flip,
23        Crop,
24        tokio::sync::oneshot::Sender<Result<(), Error>>,
25    ),
26    SetColors(
27        Vec<[u8; 4]>,
28        tokio::sync::oneshot::Sender<Result<(), Error>>,
29    ),
30    DrawDecodedMasks(
31        SendablePtr<TensorDyn>,
32        SendablePtr<DetectBox>,
33        SendablePtr<Segmentation>,
34        f32,                            // opacity
35        Option<SendablePtr<TensorDyn>>, // background
36        Option<[f32; 4]>,               // letterbox
37        crate::ColorMode,
38        tokio::sync::oneshot::Sender<Result<(), Error>>,
39    ),
40    DrawProtoMasks(
41        SendablePtr<TensorDyn>,
42        SendablePtr<DetectBox>,
43        SendablePtr<ProtoData>,
44        f32,                            // opacity
45        Option<SendablePtr<TensorDyn>>, // background
46        Option<[f32; 4]>,               // letterbox
47        crate::ColorMode,
48        tokio::sync::oneshot::Sender<Result<(), Error>>,
49    ),
50    SetInt8Interpolation(
51        Int8InterpolationMode,
52        tokio::sync::oneshot::Sender<Result<(), Error>>,
53    ),
54    PboCreate(
55        usize, // buffer size in bytes
56        tokio::sync::oneshot::Sender<Result<u32, Error>>,
57    ),
58    PboMap(
59        u32,   // buffer_id
60        usize, // size
61        tokio::sync::oneshot::Sender<Result<edgefirst_tensor::PboMapping, Error>>,
62    ),
63    PboUnmap(
64        u32, // buffer_id
65        tokio::sync::oneshot::Sender<Result<(), Error>>,
66    ),
67    PboDelete(u32), // fire-and-forget, no reply
68}
69
70/// Implements PboOps by sending commands to the GL thread.
71///
72/// Uses a `WeakSender` so that PBO images don't keep the GL thread's channel
73/// alive. When the `GLProcessorThreaded` is dropped, its `Sender` is the last
74/// strong reference — dropping it closes the channel and lets the GL thread
75/// exit. PBO operations after that return `PboDisconnected`.
76struct GlPboOps {
77    sender: WeakSender<GLProcessorMessage>,
78}
79
80// SAFETY: GlPboOps sends all GL operations to the dedicated GL thread via a
81// channel. `map_buffer` returns a CPU-visible pointer from `glMapBufferRange`
82// that remains valid until `unmap_buffer` calls `glUnmapBuffer` on the GL thread.
83// `delete_buffer` sends a fire-and-forget deletion command to the GL thread.
84unsafe impl edgefirst_tensor::PboOps for GlPboOps {
85    fn map_buffer(
86        &self,
87        buffer_id: u32,
88        size: usize,
89    ) -> edgefirst_tensor::Result<edgefirst_tensor::PboMapping> {
90        let sender = self
91            .sender
92            .upgrade()
93            .ok_or(edgefirst_tensor::Error::PboDisconnected)?;
94        let (tx, rx) = tokio::sync::oneshot::channel();
95        sender
96            .blocking_send(GLProcessorMessage::PboMap(buffer_id, size, tx))
97            .map_err(|_| edgefirst_tensor::Error::PboDisconnected)?;
98        rx.blocking_recv()
99            .map_err(|_| edgefirst_tensor::Error::PboDisconnected)?
100            .map_err(|e| {
101                edgefirst_tensor::Error::NotImplemented(format!("GL PBO map failed: {e:?}"))
102            })
103    }
104
105    fn unmap_buffer(&self, buffer_id: u32) -> edgefirst_tensor::Result<()> {
106        let sender = self
107            .sender
108            .upgrade()
109            .ok_or(edgefirst_tensor::Error::PboDisconnected)?;
110        let (tx, rx) = tokio::sync::oneshot::channel();
111        sender
112            .blocking_send(GLProcessorMessage::PboUnmap(buffer_id, tx))
113            .map_err(|_| edgefirst_tensor::Error::PboDisconnected)?;
114        rx.blocking_recv()
115            .map_err(|_| edgefirst_tensor::Error::PboDisconnected)?
116            .map_err(|e| {
117                edgefirst_tensor::Error::NotImplemented(format!("GL PBO unmap failed: {e:?}"))
118            })
119    }
120
121    fn delete_buffer(&self, buffer_id: u32) {
122        if let Some(sender) = self.sender.upgrade() {
123            let _ = sender.blocking_send(GLProcessorMessage::PboDelete(buffer_id));
124        }
125    }
126}
127
128/// OpenGL multi-threaded image converter. The actual conversion is done in a
129/// separate rendering thread, as OpenGL contexts are not thread-safe. This can
130/// be safely sent between threads. The `convert()` call sends the conversion
131/// request to the rendering thread and waits for the result.
132#[derive(Debug)]
133pub struct GLProcessorThreaded {
134    // This is only None when the converter is being dropped.
135    handle: Option<JoinHandle<()>>,
136
137    // This is only None when the converter is being dropped.
138    sender: Option<Sender<GLProcessorMessage>>,
139    transfer_backend: TransferBackend,
140}
141
142unsafe impl Send for GLProcessorThreaded {}
143unsafe impl Sync for GLProcessorThreaded {}
144
145struct SendablePtr<T: Send> {
146    ptr: NonNull<T>,
147    len: usize,
148}
149
150unsafe impl<T> Send for SendablePtr<T> where T: Send {}
151
152/// Extract a human-readable message from a `catch_unwind` panic payload.
153fn panic_message(info: &(dyn std::any::Any + Send)) -> String {
154    if let Some(s) = info.downcast_ref::<&str>() {
155        s.to_string()
156    } else if let Some(s) = info.downcast_ref::<String>() {
157        s.clone()
158    } else {
159        "unknown panic".to_string()
160    }
161}
162
163impl GLProcessorThreaded {
164    /// Creates a new OpenGL multi-threaded image converter.
165    pub fn new(kind: Option<EglDisplayKind>) -> Result<Self, Error> {
166        let (send, mut recv) = tokio::sync::mpsc::channel::<GLProcessorMessage>(1);
167
168        let (create_ctx_send, create_ctx_recv) = tokio::sync::oneshot::channel();
169
170        let func = move || {
171            let init_result = {
172                let _guard = super::context::GL_MUTEX
173                    .lock()
174                    .unwrap_or_else(|e| e.into_inner());
175                GLProcessorST::new(kind)
176            };
177            let mut gl_converter = match init_result {
178                Ok(gl) => gl,
179                Err(e) => {
180                    let _ = create_ctx_send.send(Err(e));
181                    return;
182                }
183            };
184            let _ = create_ctx_send.send(Ok(gl_converter.gl_context.transfer_backend));
185            let mut poisoned = false;
186            while let Some(msg) = recv.blocking_recv() {
187                // Serialize all GL operations across GLProcessorST instances.
188                // See `GL_MUTEX` doc comment in context.rs for rationale.
189                let _guard = super::context::GL_MUTEX
190                    .lock()
191                    .unwrap_or_else(|e| e.into_inner());
192
193                // After a panic, the GL context is in an undefined state. Reject
194                // all subsequent messages with an error rather than risking wrong
195                // output or a GPU hang from corrupted GL state. This follows the
196                // same pattern as std::sync::Mutex poisoning.
197                if poisoned {
198                    let poison_err = crate::Error::Internal(
199                        "GL context is poisoned after a prior panic".to_string(),
200                    );
201                    match msg {
202                        GLProcessorMessage::ImageConvert(.., resp) => {
203                            let _ = resp.send(Err(poison_err));
204                        }
205                        GLProcessorMessage::DrawDecodedMasks(.., resp) => {
206                            let _ = resp.send(Err(poison_err));
207                        }
208                        GLProcessorMessage::DrawProtoMasks(.., resp) => {
209                            let _ = resp.send(Err(poison_err));
210                        }
211                        GLProcessorMessage::SetColors(_, resp) => {
212                            let _ = resp.send(Err(poison_err));
213                        }
214                        GLProcessorMessage::SetInt8Interpolation(_, resp) => {
215                            let _ = resp.send(Err(poison_err));
216                        }
217                        GLProcessorMessage::PboCreate(_, resp) => {
218                            let _ = resp.send(Err(poison_err));
219                        }
220                        GLProcessorMessage::PboMap(_, _, resp) => {
221                            let _ = resp.send(Err(poison_err));
222                        }
223                        GLProcessorMessage::PboUnmap(_, resp) => {
224                            let _ = resp.send(Err(poison_err));
225                        }
226                        GLProcessorMessage::PboDelete(_) => {}
227                    }
228                    continue;
229                }
230
231                match msg {
232                    GLProcessorMessage::ImageConvert(src, mut dst, rotation, flip, crop, resp) => {
233                        // SAFETY: This is safe because the convert() function waits for the resp to
234                        // be sent before dropping the borrow for src and dst
235                        let result = std::panic::catch_unwind(AssertUnwindSafe(|| {
236                            let src = unsafe { src.ptr.as_ref() };
237                            let dst = unsafe { dst.ptr.as_mut() };
238                            gl_converter.convert(src, dst, rotation, flip, crop)
239                        }));
240                        let _ = resp.send(match result {
241                            Ok(res) => res,
242                            Err(e) => {
243                                poisoned = true;
244                                Err(crate::Error::Internal(format!(
245                                    "GL thread panicked during ImageConvert: {}",
246                                    panic_message(e.as_ref()),
247                                )))
248                            }
249                        });
250                    }
251                    GLProcessorMessage::DrawDecodedMasks(
252                        mut dst,
253                        det,
254                        seg,
255                        opacity,
256                        bg,
257                        letterbox,
258                        color_mode,
259                        resp,
260                    ) => {
261                        // SAFETY: This is safe because the draw_decoded_masks() function waits for the
262                        // resp to be sent before dropping the borrow for dst, detect,
263                        // segmentation, and background
264                        let result = std::panic::catch_unwind(AssertUnwindSafe(|| {
265                            let dst = unsafe { dst.ptr.as_mut() };
266                            let det =
267                                unsafe { std::slice::from_raw_parts(det.ptr.as_ptr(), det.len) };
268                            let seg =
269                                unsafe { std::slice::from_raw_parts(seg.ptr.as_ptr(), seg.len) };
270                            let bg_ref = bg.map(|p| unsafe { &*p.ptr.as_ptr() });
271                            gl_converter.draw_decoded_masks(
272                                dst,
273                                det,
274                                seg,
275                                crate::MaskOverlay {
276                                    background: bg_ref,
277                                    opacity,
278                                    letterbox,
279                                    color_mode,
280                                },
281                            )
282                        }));
283                        let _ = resp.send(match result {
284                            Ok(res) => res,
285                            Err(e) => {
286                                poisoned = true;
287                                Err(crate::Error::Internal(format!(
288                                    "GL thread panicked during DrawDecodedMasks: {}",
289                                    panic_message(e.as_ref()),
290                                )))
291                            }
292                        });
293                    }
294                    GLProcessorMessage::DrawProtoMasks(
295                        mut dst,
296                        det,
297                        proto_data,
298                        opacity,
299                        bg,
300                        letterbox,
301                        color_mode,
302                        resp,
303                    ) => {
304                        // SAFETY: Same safety invariant as DrawDecodedMasks — caller
305                        // blocks on resp before dropping borrows.
306                        let result = std::panic::catch_unwind(AssertUnwindSafe(|| {
307                            let dst = unsafe { dst.ptr.as_mut() };
308                            let det =
309                                unsafe { std::slice::from_raw_parts(det.ptr.as_ptr(), det.len) };
310                            let bg_ref = bg.map(|p| unsafe { &*p.ptr.as_ptr() });
311                            let proto_data = unsafe { proto_data.ptr.as_ref() };
312                            gl_converter.draw_proto_masks(
313                                dst,
314                                det,
315                                proto_data,
316                                crate::MaskOverlay {
317                                    background: bg_ref,
318                                    opacity,
319                                    letterbox,
320                                    color_mode,
321                                },
322                            )
323                        }));
324                        let _ = resp.send(match result {
325                            Ok(res) => res,
326                            Err(e) => {
327                                poisoned = true;
328                                Err(crate::Error::Internal(format!(
329                                    "GL thread panicked during DrawProtoMasks: {}",
330                                    panic_message(e.as_ref()),
331                                )))
332                            }
333                        });
334                    }
335                    GLProcessorMessage::SetColors(colors, resp) => {
336                        let result = std::panic::catch_unwind(AssertUnwindSafe(|| {
337                            gl_converter.set_class_colors(&colors)
338                        }));
339                        let _ = resp.send(match result {
340                            Ok(res) => res,
341                            Err(e) => {
342                                poisoned = true;
343                                Err(crate::Error::Internal(format!(
344                                    "GL thread panicked during SetColors: {}",
345                                    panic_message(e.as_ref()),
346                                )))
347                            }
348                        });
349                    }
350                    GLProcessorMessage::SetInt8Interpolation(mode, resp) => {
351                        let result = std::panic::catch_unwind(AssertUnwindSafe(|| {
352                            gl_converter.set_int8_interpolation_mode(mode);
353                            Ok(())
354                        }));
355                        let _ = resp.send(match result {
356                            Ok(res) => res,
357                            Err(e) => {
358                                poisoned = true;
359                                Err(crate::Error::Internal(format!(
360                                    "GL thread panicked during SetInt8Interpolation: {}",
361                                    panic_message(e.as_ref()),
362                                )))
363                            }
364                        });
365                    }
366                    GLProcessorMessage::PboCreate(size, resp) => {
367                        let result = std::panic::catch_unwind(AssertUnwindSafe(|| unsafe {
368                            let mut id: u32 = 0;
369                            gls::gl::GenBuffers(1, &mut id);
370                            gls::gl::BindBuffer(gls::gl::PIXEL_PACK_BUFFER, id);
371                            gls::gl::BufferData(
372                                gls::gl::PIXEL_PACK_BUFFER,
373                                size as isize,
374                                std::ptr::null(),
375                                gls::gl::STREAM_COPY,
376                            );
377                            gls::gl::BindBuffer(gls::gl::PIXEL_PACK_BUFFER, 0);
378                            match check_gl_error("PboCreate", 0) {
379                                Ok(()) => Ok(id),
380                                Err(e) => {
381                                    gls::gl::DeleteBuffers(1, &id);
382                                    Err(e)
383                                }
384                            }
385                        }));
386                        let _ = resp.send(match result {
387                            Ok(res) => res,
388                            Err(e) => {
389                                poisoned = true;
390                                Err(crate::Error::Internal(format!(
391                                    "GL thread panicked during PboCreate: {}",
392                                    panic_message(e.as_ref()),
393                                )))
394                            }
395                        });
396                    }
397                    GLProcessorMessage::PboMap(buffer_id, size, resp) => {
398                        let result = std::panic::catch_unwind(AssertUnwindSafe(|| unsafe {
399                            gls::gl::BindBuffer(gls::gl::PIXEL_PACK_BUFFER, buffer_id);
400                            let ptr = gls::gl::MapBufferRange(
401                                gls::gl::PIXEL_PACK_BUFFER,
402                                0,
403                                size as isize,
404                                gls::gl::MAP_READ_BIT | gls::gl::MAP_WRITE_BIT,
405                            );
406                            gls::gl::BindBuffer(gls::gl::PIXEL_PACK_BUFFER, 0);
407                            if ptr.is_null() {
408                                Err(crate::Error::OpenGl(
409                                    "glMapBufferRange returned null".to_string(),
410                                ))
411                            } else {
412                                Ok(edgefirst_tensor::PboMapping {
413                                    ptr: ptr as *mut u8,
414                                    size,
415                                })
416                            }
417                        }));
418                        let _ = resp.send(match result {
419                            Ok(res) => res,
420                            Err(e) => {
421                                poisoned = true;
422                                Err(crate::Error::Internal(format!(
423                                    "GL thread panicked during PboMap: {}",
424                                    panic_message(e.as_ref()),
425                                )))
426                            }
427                        });
428                    }
429                    GLProcessorMessage::PboUnmap(buffer_id, resp) => {
430                        let result = std::panic::catch_unwind(AssertUnwindSafe(|| unsafe {
431                            gls::gl::BindBuffer(gls::gl::PIXEL_PACK_BUFFER, buffer_id);
432                            let ok = gls::gl::UnmapBuffer(gls::gl::PIXEL_PACK_BUFFER);
433                            gls::gl::BindBuffer(gls::gl::PIXEL_PACK_BUFFER, 0);
434                            if ok == gls::gl::FALSE {
435                                Err(Error::OpenGl(
436                                    "PBO data was corrupted during mapping".into(),
437                                ))
438                            } else {
439                                check_gl_error("PboUnmap", 0)
440                            }
441                        }));
442                        let _ = resp.send(match result {
443                            Ok(res) => res,
444                            Err(e) => {
445                                poisoned = true;
446                                Err(crate::Error::Internal(format!(
447                                    "GL thread panicked during PboUnmap: {}",
448                                    panic_message(e.as_ref()),
449                                )))
450                            }
451                        });
452                    }
453                    GLProcessorMessage::PboDelete(buffer_id) => {
454                        if let Err(e) = std::panic::catch_unwind(AssertUnwindSafe(|| unsafe {
455                            gls::gl::DeleteBuffers(1, &buffer_id);
456                        })) {
457                            poisoned = true;
458                            log::error!(
459                                "GL thread panicked during PboDelete: {}",
460                                panic_message(e.as_ref()),
461                            );
462                        }
463                    }
464                }
465            }
466            // Explicitly drop under the mutex so EGL teardown is serialized.
467            let _guard = super::context::GL_MUTEX
468                .lock()
469                .unwrap_or_else(|e| e.into_inner());
470            drop(gl_converter);
471        };
472
473        // let handle = tokio::task::spawn(func());
474        let handle = std::thread::spawn(func);
475
476        let transfer_backend = match create_ctx_recv.blocking_recv() {
477            Ok(Err(e)) => return Err(e),
478            Err(_) => {
479                return Err(Error::Internal(
480                    "GL converter error messaging closed without update".to_string(),
481                ));
482            }
483            Ok(Ok(tb)) => tb,
484        };
485
486        Ok(Self {
487            handle: Some(handle),
488            sender: Some(send),
489            transfer_backend,
490        })
491    }
492}
493
494impl ImageProcessorTrait for GLProcessorThreaded {
495    fn convert(
496        &mut self,
497        src: &TensorDyn,
498        dst: &mut TensorDyn,
499        rotation: crate::Rotation,
500        flip: Flip,
501        crop: Crop,
502    ) -> crate::Result<()> {
503        let (err_send, err_recv) = tokio::sync::oneshot::channel();
504        self.sender
505            .as_ref()
506            .ok_or_else(|| Error::Internal("GL processor is shutting down".to_string()))?
507            .blocking_send(GLProcessorMessage::ImageConvert(
508                SendablePtr {
509                    ptr: NonNull::from(src),
510                    len: 1,
511                },
512                SendablePtr {
513                    ptr: NonNull::from(dst),
514                    len: 1,
515                },
516                rotation,
517                flip,
518                crop,
519                err_send,
520            ))
521            .map_err(|_| Error::Internal("GL converter thread exited".to_string()))?;
522        err_recv.blocking_recv().map_err(|_| {
523            Error::Internal("GL converter error messaging closed without update".to_string())
524        })?
525    }
526
527    fn draw_decoded_masks(
528        &mut self,
529        dst: &mut TensorDyn,
530        detect: &[crate::DetectBox],
531        segmentation: &[crate::Segmentation],
532        overlay: crate::MaskOverlay<'_>,
533    ) -> crate::Result<()> {
534        let (err_send, err_recv) = tokio::sync::oneshot::channel();
535        self.sender
536            .as_ref()
537            .ok_or_else(|| Error::Internal("GL processor is shutting down".to_string()))?
538            .blocking_send(GLProcessorMessage::DrawDecodedMasks(
539                SendablePtr {
540                    ptr: NonNull::from(dst),
541                    len: 1,
542                },
543                SendablePtr {
544                    ptr: NonNull::new(detect.as_ptr() as *mut DetectBox).unwrap(),
545                    len: detect.len(),
546                },
547                SendablePtr {
548                    ptr: NonNull::new(segmentation.as_ptr() as *mut Segmentation).unwrap(),
549                    len: segmentation.len(),
550                },
551                overlay.opacity,
552                overlay.background.map(|bg| SendablePtr {
553                    ptr: NonNull::from(bg).cast::<TensorDyn>(),
554                    len: 1,
555                }),
556                overlay.letterbox,
557                overlay.color_mode,
558                err_send,
559            ))
560            .map_err(|_| Error::Internal("GL converter thread exited".to_string()))?;
561        err_recv.blocking_recv().map_err(|_| {
562            Error::Internal("GL converter error messaging closed without update".to_string())
563        })?
564    }
565
566    fn draw_proto_masks(
567        &mut self,
568        dst: &mut TensorDyn,
569        detect: &[DetectBox],
570        proto_data: &ProtoData,
571        overlay: crate::MaskOverlay<'_>,
572    ) -> crate::Result<()> {
573        let (err_send, err_recv) = tokio::sync::oneshot::channel();
574        self.sender
575            .as_ref()
576            .ok_or_else(|| Error::Internal("GL processor is shutting down".to_string()))?
577            .blocking_send(GLProcessorMessage::DrawProtoMasks(
578                SendablePtr {
579                    ptr: NonNull::from(dst),
580                    len: 1,
581                },
582                SendablePtr {
583                    ptr: NonNull::new(detect.as_ptr() as *mut DetectBox).unwrap(),
584                    len: detect.len(),
585                },
586                SendablePtr {
587                    ptr: NonNull::from(proto_data).cast::<ProtoData>(),
588                    len: 1,
589                },
590                overlay.opacity,
591                overlay.background.map(|bg| SendablePtr {
592                    ptr: NonNull::from(bg).cast::<TensorDyn>(),
593                    len: 1,
594                }),
595                overlay.letterbox,
596                overlay.color_mode,
597                err_send,
598            ))
599            .map_err(|_| Error::Internal("GL converter thread exited".to_string()))?;
600        err_recv.blocking_recv().map_err(|_| {
601            Error::Internal("GL converter error messaging closed without update".to_string())
602        })?
603    }
604
605    fn set_class_colors(&mut self, colors: &[[u8; 4]]) -> Result<(), crate::Error> {
606        let (err_send, err_recv) = tokio::sync::oneshot::channel();
607        self.sender
608            .as_ref()
609            .ok_or_else(|| Error::Internal("GL processor is shutting down".to_string()))?
610            .blocking_send(GLProcessorMessage::SetColors(colors.to_vec(), err_send))
611            .map_err(|_| Error::Internal("GL converter thread exited".to_string()))?;
612        err_recv.blocking_recv().map_err(|_| {
613            Error::Internal("GL converter error messaging closed without update".to_string())
614        })?
615    }
616}
617
618impl GLProcessorThreaded {
619    /// Sets the interpolation mode for int8 proto textures.
620    pub fn set_int8_interpolation_mode(
621        &mut self,
622        mode: Int8InterpolationMode,
623    ) -> Result<(), crate::Error> {
624        let (err_send, err_recv) = tokio::sync::oneshot::channel();
625        self.sender
626            .as_ref()
627            .ok_or_else(|| Error::Internal("GL processor is shutting down".to_string()))?
628            .blocking_send(GLProcessorMessage::SetInt8Interpolation(mode, err_send))
629            .map_err(|_| Error::Internal("GL converter thread exited".to_string()))?;
630        err_recv.blocking_recv().map_err(|_| {
631            Error::Internal("GL converter error messaging closed without update".to_string())
632        })?
633    }
634
635    /// Create a PBO-backed [`Tensor<u8>`] image on the GL thread.
636    pub fn create_pbo_image(
637        &self,
638        width: usize,
639        height: usize,
640        format: edgefirst_tensor::PixelFormat,
641    ) -> Result<edgefirst_tensor::Tensor<u8>, Error> {
642        let sender = self
643            .sender
644            .as_ref()
645            .ok_or(Error::OpenGl("GL processor is shutting down".to_string()))?;
646
647        let channels = format.channels();
648        let size = match format.layout() {
649            edgefirst_tensor::PixelLayout::SemiPlanar => {
650                // NV12: W*H*3/2, NV16: W*H*2
651                match format {
652                    edgefirst_tensor::PixelFormat::Nv12 => width * height * 3 / 2,
653                    edgefirst_tensor::PixelFormat::Nv16 => width * height * 2,
654                    _ => width * height * channels,
655                }
656            }
657            edgefirst_tensor::PixelLayout::Packed | edgefirst_tensor::PixelLayout::Planar => {
658                width * height * channels
659            }
660            _ => width * height * channels,
661        };
662        if size == 0 {
663            return Err(Error::OpenGl("Invalid image dimensions".to_string()));
664        }
665
666        // Allocate PBO on the GL thread
667        let (tx, rx) = tokio::sync::oneshot::channel();
668        sender
669            .blocking_send(GLProcessorMessage::PboCreate(size, tx))
670            .map_err(|_| Error::OpenGl("GL thread channel closed".to_string()))?;
671        let buffer_id = rx
672            .blocking_recv()
673            .map_err(|_| Error::OpenGl("GL thread did not respond".to_string()))??;
674
675        let ops: std::sync::Arc<dyn edgefirst_tensor::PboOps> = std::sync::Arc::new(GlPboOps {
676            sender: sender.downgrade(),
677        });
678
679        let shape = match format.layout() {
680            edgefirst_tensor::PixelLayout::Planar => vec![channels, height, width],
681            edgefirst_tensor::PixelLayout::SemiPlanar => {
682                let total_h = match format {
683                    edgefirst_tensor::PixelFormat::Nv12 => height * 3 / 2,
684                    edgefirst_tensor::PixelFormat::Nv16 => height * 2,
685                    _ => height * 2,
686                };
687                vec![total_h, width]
688            }
689            _ => vec![height, width, channels],
690        };
691
692        let pbo_tensor =
693            edgefirst_tensor::PboTensor::<u8>::from_pbo(buffer_id, size, &shape, None, ops)
694                .map_err(|e| Error::OpenGl(format!("PBO tensor creation failed: {e:?}")))?;
695        let mut tensor = edgefirst_tensor::Tensor::from_pbo(pbo_tensor);
696        tensor
697            .set_format(format)
698            .map_err(|e| Error::OpenGl(format!("Failed to set format on PBO tensor: {e:?}")))?;
699        Ok(tensor)
700    }
701
702    /// Returns the active transfer backend.
703    pub(crate) fn transfer_backend(&self) -> TransferBackend {
704        self.transfer_backend
705    }
706}
707
708impl Drop for GLProcessorThreaded {
709    fn drop(&mut self) {
710        drop(self.sender.take());
711        let _ = self.handle.take().and_then(|h| h.join().ok());
712    }
713}