Skip to main content

edgefirst_image/gl/
threaded.rs

1// SPDX-FileCopyrightText: Copyright 2025 Au-Zone Technologies
2// SPDX-License-Identifier: Apache-2.0
3
4use edgefirst_decoder::{DetectBox, ProtoData, Segmentation};
5use std::panic::AssertUnwindSafe;
6use std::ptr::NonNull;
7use std::thread::JoinHandle;
8use tokio::sync::mpsc::{Sender, WeakSender};
9
10use super::processor::GLProcessorST;
11use super::shaders::check_gl_error;
12use super::{EglDisplayKind, Int8InterpolationMode, TransferBackend};
13use crate::{Crop, Error, Flip, ImageProcessorTrait, Rotation};
14use edgefirst_tensor::TensorDyn;
15
16#[allow(clippy::type_complexity)]
17enum GLProcessorMessage {
18    ImageConvert(
19        SendablePtr<TensorDyn>,
20        SendablePtr<TensorDyn>,
21        Rotation,
22        Flip,
23        Crop,
24        tokio::sync::oneshot::Sender<Result<(), Error>>,
25    ),
26    SetColors(
27        Vec<[u8; 4]>,
28        tokio::sync::oneshot::Sender<Result<(), Error>>,
29    ),
30    DrawDecodedMasks(
31        SendablePtr<TensorDyn>,
32        SendablePtr<DetectBox>,
33        SendablePtr<Segmentation>,
34        f32,                            // opacity
35        Option<SendablePtr<TensorDyn>>, // background
36        tokio::sync::oneshot::Sender<Result<(), Error>>,
37    ),
38    DrawProtoMasks(
39        SendablePtr<TensorDyn>,
40        SendablePtr<DetectBox>,
41        SendablePtr<ProtoData>,
42        f32,                            // opacity
43        Option<SendablePtr<TensorDyn>>, // background
44        tokio::sync::oneshot::Sender<Result<(), Error>>,
45    ),
46    SetInt8Interpolation(
47        Int8InterpolationMode,
48        tokio::sync::oneshot::Sender<Result<(), Error>>,
49    ),
50    PboCreate(
51        usize, // buffer size in bytes
52        tokio::sync::oneshot::Sender<Result<u32, Error>>,
53    ),
54    PboMap(
55        u32,   // buffer_id
56        usize, // size
57        tokio::sync::oneshot::Sender<Result<edgefirst_tensor::PboMapping, Error>>,
58    ),
59    PboUnmap(
60        u32, // buffer_id
61        tokio::sync::oneshot::Sender<Result<(), Error>>,
62    ),
63    PboDelete(u32), // fire-and-forget, no reply
64}
65
66/// Implements PboOps by sending commands to the GL thread.
67///
68/// Uses a `WeakSender` so that PBO images don't keep the GL thread's channel
69/// alive. When the `GLProcessorThreaded` is dropped, its `Sender` is the last
70/// strong reference — dropping it closes the channel and lets the GL thread
71/// exit. PBO operations after that return `PboDisconnected`.
72struct GlPboOps {
73    sender: WeakSender<GLProcessorMessage>,
74}
75
76// SAFETY: GlPboOps sends all GL operations to the dedicated GL thread via a
77// channel. `map_buffer` returns a CPU-visible pointer from `glMapBufferRange`
78// that remains valid until `unmap_buffer` calls `glUnmapBuffer` on the GL thread.
79// `delete_buffer` sends a fire-and-forget deletion command to the GL thread.
80unsafe impl edgefirst_tensor::PboOps for GlPboOps {
81    fn map_buffer(
82        &self,
83        buffer_id: u32,
84        size: usize,
85    ) -> edgefirst_tensor::Result<edgefirst_tensor::PboMapping> {
86        let sender = self
87            .sender
88            .upgrade()
89            .ok_or(edgefirst_tensor::Error::PboDisconnected)?;
90        let (tx, rx) = tokio::sync::oneshot::channel();
91        sender
92            .blocking_send(GLProcessorMessage::PboMap(buffer_id, size, tx))
93            .map_err(|_| edgefirst_tensor::Error::PboDisconnected)?;
94        rx.blocking_recv()
95            .map_err(|_| edgefirst_tensor::Error::PboDisconnected)?
96            .map_err(|e| {
97                edgefirst_tensor::Error::NotImplemented(format!("GL PBO map failed: {e:?}"))
98            })
99    }
100
101    fn unmap_buffer(&self, buffer_id: u32) -> edgefirst_tensor::Result<()> {
102        let sender = self
103            .sender
104            .upgrade()
105            .ok_or(edgefirst_tensor::Error::PboDisconnected)?;
106        let (tx, rx) = tokio::sync::oneshot::channel();
107        sender
108            .blocking_send(GLProcessorMessage::PboUnmap(buffer_id, tx))
109            .map_err(|_| edgefirst_tensor::Error::PboDisconnected)?;
110        rx.blocking_recv()
111            .map_err(|_| edgefirst_tensor::Error::PboDisconnected)?
112            .map_err(|e| {
113                edgefirst_tensor::Error::NotImplemented(format!("GL PBO unmap failed: {e:?}"))
114            })
115    }
116
117    fn delete_buffer(&self, buffer_id: u32) {
118        if let Some(sender) = self.sender.upgrade() {
119            let _ = sender.blocking_send(GLProcessorMessage::PboDelete(buffer_id));
120        }
121    }
122}
123
124/// OpenGL multi-threaded image converter. The actual conversion is done in a
125/// separate rendering thread, as OpenGL contexts are not thread-safe. This can
126/// be safely sent between threads. The `convert()` call sends the conversion
127/// request to the rendering thread and waits for the result.
128#[derive(Debug)]
129pub struct GLProcessorThreaded {
130    // This is only None when the converter is being dropped.
131    handle: Option<JoinHandle<()>>,
132
133    // This is only None when the converter is being dropped.
134    sender: Option<Sender<GLProcessorMessage>>,
135    transfer_backend: TransferBackend,
136}
137
138unsafe impl Send for GLProcessorThreaded {}
139unsafe impl Sync for GLProcessorThreaded {}
140
141struct SendablePtr<T: Send> {
142    ptr: NonNull<T>,
143    len: usize,
144}
145
146unsafe impl<T> Send for SendablePtr<T> where T: Send {}
147
148/// Extract a human-readable message from a `catch_unwind` panic payload.
149fn panic_message(info: &(dyn std::any::Any + Send)) -> String {
150    if let Some(s) = info.downcast_ref::<&str>() {
151        s.to_string()
152    } else if let Some(s) = info.downcast_ref::<String>() {
153        s.clone()
154    } else {
155        "unknown panic".to_string()
156    }
157}
158
159impl GLProcessorThreaded {
160    /// Creates a new OpenGL multi-threaded image converter.
161    pub fn new(kind: Option<EglDisplayKind>) -> Result<Self, Error> {
162        let (send, mut recv) = tokio::sync::mpsc::channel::<GLProcessorMessage>(1);
163
164        let (create_ctx_send, create_ctx_recv) = tokio::sync::oneshot::channel();
165
166        let func = move || {
167            let mut gl_converter = match GLProcessorST::new(kind) {
168                Ok(gl) => gl,
169                Err(e) => {
170                    let _ = create_ctx_send.send(Err(e));
171                    return;
172                }
173            };
174            let _ = create_ctx_send.send(Ok(gl_converter.gl_context.transfer_backend));
175            let mut poisoned = false;
176            while let Some(msg) = recv.blocking_recv() {
177                // After a panic, the GL context is in an undefined state. Reject
178                // all subsequent messages with an error rather than risking wrong
179                // output or a GPU hang from corrupted GL state. This follows the
180                // same pattern as std::sync::Mutex poisoning.
181                if poisoned {
182                    let poison_err = crate::Error::Internal(
183                        "GL context is poisoned after a prior panic".to_string(),
184                    );
185                    match msg {
186                        GLProcessorMessage::ImageConvert(.., resp) => {
187                            let _ = resp.send(Err(poison_err));
188                        }
189                        GLProcessorMessage::DrawDecodedMasks(.., resp) => {
190                            let _ = resp.send(Err(poison_err));
191                        }
192                        GLProcessorMessage::DrawProtoMasks(.., resp) => {
193                            let _ = resp.send(Err(poison_err));
194                        }
195                        GLProcessorMessage::SetColors(_, resp) => {
196                            let _ = resp.send(Err(poison_err));
197                        }
198                        GLProcessorMessage::SetInt8Interpolation(_, resp) => {
199                            let _ = resp.send(Err(poison_err));
200                        }
201                        GLProcessorMessage::PboCreate(_, resp) => {
202                            let _ = resp.send(Err(poison_err));
203                        }
204                        GLProcessorMessage::PboMap(_, _, resp) => {
205                            let _ = resp.send(Err(poison_err));
206                        }
207                        GLProcessorMessage::PboUnmap(_, resp) => {
208                            let _ = resp.send(Err(poison_err));
209                        }
210                        GLProcessorMessage::PboDelete(_) => {}
211                    }
212                    continue;
213                }
214
215                match msg {
216                    GLProcessorMessage::ImageConvert(src, mut dst, rotation, flip, crop, resp) => {
217                        // SAFETY: This is safe because the convert() function waits for the resp to
218                        // be sent before dropping the borrow for src and dst
219                        let result = std::panic::catch_unwind(AssertUnwindSafe(|| {
220                            let src = unsafe { src.ptr.as_ref() };
221                            let dst = unsafe { dst.ptr.as_mut() };
222                            gl_converter.convert(src, dst, rotation, flip, crop)
223                        }));
224                        let _ = resp.send(match result {
225                            Ok(res) => res,
226                            Err(e) => {
227                                poisoned = true;
228                                Err(crate::Error::Internal(format!(
229                                    "GL thread panicked during ImageConvert: {}",
230                                    panic_message(e.as_ref()),
231                                )))
232                            }
233                        });
234                    }
235                    GLProcessorMessage::DrawDecodedMasks(mut dst, det, seg, opacity, bg, resp) => {
236                        // SAFETY: This is safe because the draw_decoded_masks() function waits for the
237                        // resp to be sent before dropping the borrow for dst, detect,
238                        // segmentation, and background
239                        let result = std::panic::catch_unwind(AssertUnwindSafe(|| {
240                            let dst = unsafe { dst.ptr.as_mut() };
241                            let det =
242                                unsafe { std::slice::from_raw_parts(det.ptr.as_ptr(), det.len) };
243                            let seg =
244                                unsafe { std::slice::from_raw_parts(seg.ptr.as_ptr(), seg.len) };
245                            let bg_ref = bg.map(|p| unsafe { &*p.ptr.as_ptr() });
246                            gl_converter.draw_decoded_masks(
247                                dst,
248                                det,
249                                seg,
250                                crate::MaskOverlay {
251                                    background: bg_ref,
252                                    opacity,
253                                },
254                            )
255                        }));
256                        let _ = resp.send(match result {
257                            Ok(res) => res,
258                            Err(e) => {
259                                poisoned = true;
260                                Err(crate::Error::Internal(format!(
261                                    "GL thread panicked during DrawDecodedMasks: {}",
262                                    panic_message(e.as_ref()),
263                                )))
264                            }
265                        });
266                    }
267                    GLProcessorMessage::DrawProtoMasks(
268                        mut dst,
269                        det,
270                        proto_data,
271                        opacity,
272                        bg,
273                        resp,
274                    ) => {
275                        // SAFETY: Same safety invariant as DrawDecodedMasks — caller
276                        // blocks on resp before dropping borrows.
277                        let result = std::panic::catch_unwind(AssertUnwindSafe(|| {
278                            let dst = unsafe { dst.ptr.as_mut() };
279                            let det =
280                                unsafe { std::slice::from_raw_parts(det.ptr.as_ptr(), det.len) };
281                            let bg_ref = bg.map(|p| unsafe { &*p.ptr.as_ptr() });
282                            let proto_data = unsafe { proto_data.ptr.as_ref() };
283                            gl_converter.draw_proto_masks(
284                                dst,
285                                det,
286                                proto_data,
287                                crate::MaskOverlay {
288                                    background: bg_ref,
289                                    opacity,
290                                },
291                            )
292                        }));
293                        let _ = resp.send(match result {
294                            Ok(res) => res,
295                            Err(e) => {
296                                poisoned = true;
297                                Err(crate::Error::Internal(format!(
298                                    "GL thread panicked during DrawProtoMasks: {}",
299                                    panic_message(e.as_ref()),
300                                )))
301                            }
302                        });
303                    }
304                    GLProcessorMessage::SetColors(colors, resp) => {
305                        let result = std::panic::catch_unwind(AssertUnwindSafe(|| {
306                            gl_converter.set_class_colors(&colors)
307                        }));
308                        let _ = resp.send(match result {
309                            Ok(res) => res,
310                            Err(e) => {
311                                poisoned = true;
312                                Err(crate::Error::Internal(format!(
313                                    "GL thread panicked during SetColors: {}",
314                                    panic_message(e.as_ref()),
315                                )))
316                            }
317                        });
318                    }
319                    GLProcessorMessage::SetInt8Interpolation(mode, resp) => {
320                        let result = std::panic::catch_unwind(AssertUnwindSafe(|| {
321                            gl_converter.set_int8_interpolation_mode(mode);
322                            Ok(())
323                        }));
324                        let _ = resp.send(match result {
325                            Ok(res) => res,
326                            Err(e) => {
327                                poisoned = true;
328                                Err(crate::Error::Internal(format!(
329                                    "GL thread panicked during SetInt8Interpolation: {}",
330                                    panic_message(e.as_ref()),
331                                )))
332                            }
333                        });
334                    }
335                    GLProcessorMessage::PboCreate(size, resp) => {
336                        let result = std::panic::catch_unwind(AssertUnwindSafe(|| unsafe {
337                            let mut id: u32 = 0;
338                            gls::gl::GenBuffers(1, &mut id);
339                            gls::gl::BindBuffer(gls::gl::PIXEL_PACK_BUFFER, id);
340                            gls::gl::BufferData(
341                                gls::gl::PIXEL_PACK_BUFFER,
342                                size as isize,
343                                std::ptr::null(),
344                                gls::gl::STREAM_COPY,
345                            );
346                            gls::gl::BindBuffer(gls::gl::PIXEL_PACK_BUFFER, 0);
347                            match check_gl_error("PboCreate", 0) {
348                                Ok(()) => Ok(id),
349                                Err(e) => {
350                                    gls::gl::DeleteBuffers(1, &id);
351                                    Err(e)
352                                }
353                            }
354                        }));
355                        let _ = resp.send(match result {
356                            Ok(res) => res,
357                            Err(e) => {
358                                poisoned = true;
359                                Err(crate::Error::Internal(format!(
360                                    "GL thread panicked during PboCreate: {}",
361                                    panic_message(e.as_ref()),
362                                )))
363                            }
364                        });
365                    }
366                    GLProcessorMessage::PboMap(buffer_id, size, resp) => {
367                        let result = std::panic::catch_unwind(AssertUnwindSafe(|| unsafe {
368                            gls::gl::BindBuffer(gls::gl::PIXEL_PACK_BUFFER, buffer_id);
369                            let ptr = gls::gl::MapBufferRange(
370                                gls::gl::PIXEL_PACK_BUFFER,
371                                0,
372                                size as isize,
373                                gls::gl::MAP_READ_BIT | gls::gl::MAP_WRITE_BIT,
374                            );
375                            gls::gl::BindBuffer(gls::gl::PIXEL_PACK_BUFFER, 0);
376                            if ptr.is_null() {
377                                Err(crate::Error::OpenGl(
378                                    "glMapBufferRange returned null".to_string(),
379                                ))
380                            } else {
381                                Ok(edgefirst_tensor::PboMapping {
382                                    ptr: ptr as *mut u8,
383                                    size,
384                                })
385                            }
386                        }));
387                        let _ = resp.send(match result {
388                            Ok(res) => res,
389                            Err(e) => {
390                                poisoned = true;
391                                Err(crate::Error::Internal(format!(
392                                    "GL thread panicked during PboMap: {}",
393                                    panic_message(e.as_ref()),
394                                )))
395                            }
396                        });
397                    }
398                    GLProcessorMessage::PboUnmap(buffer_id, resp) => {
399                        let result = std::panic::catch_unwind(AssertUnwindSafe(|| unsafe {
400                            gls::gl::BindBuffer(gls::gl::PIXEL_PACK_BUFFER, buffer_id);
401                            let ok = gls::gl::UnmapBuffer(gls::gl::PIXEL_PACK_BUFFER);
402                            gls::gl::BindBuffer(gls::gl::PIXEL_PACK_BUFFER, 0);
403                            if ok == gls::gl::FALSE {
404                                Err(Error::OpenGl(
405                                    "PBO data was corrupted during mapping".into(),
406                                ))
407                            } else {
408                                check_gl_error("PboUnmap", 0)
409                            }
410                        }));
411                        let _ = resp.send(match result {
412                            Ok(res) => res,
413                            Err(e) => {
414                                poisoned = true;
415                                Err(crate::Error::Internal(format!(
416                                    "GL thread panicked during PboUnmap: {}",
417                                    panic_message(e.as_ref()),
418                                )))
419                            }
420                        });
421                    }
422                    GLProcessorMessage::PboDelete(buffer_id) => {
423                        if let Err(e) = std::panic::catch_unwind(AssertUnwindSafe(|| unsafe {
424                            gls::gl::DeleteBuffers(1, &buffer_id);
425                        })) {
426                            poisoned = true;
427                            log::error!(
428                                "GL thread panicked during PboDelete: {}",
429                                panic_message(e.as_ref()),
430                            );
431                        }
432                    }
433                }
434            }
435        };
436
437        // let handle = tokio::task::spawn(func());
438        let handle = std::thread::spawn(func);
439
440        let transfer_backend = match create_ctx_recv.blocking_recv() {
441            Ok(Err(e)) => return Err(e),
442            Err(_) => {
443                return Err(Error::Internal(
444                    "GL converter error messaging closed without update".to_string(),
445                ));
446            }
447            Ok(Ok(tb)) => tb,
448        };
449
450        Ok(Self {
451            handle: Some(handle),
452            sender: Some(send),
453            transfer_backend,
454        })
455    }
456}
457
458impl ImageProcessorTrait for GLProcessorThreaded {
459    fn convert(
460        &mut self,
461        src: &TensorDyn,
462        dst: &mut TensorDyn,
463        rotation: crate::Rotation,
464        flip: Flip,
465        crop: Crop,
466    ) -> crate::Result<()> {
467        let (err_send, err_recv) = tokio::sync::oneshot::channel();
468        self.sender
469            .as_ref()
470            .ok_or_else(|| Error::Internal("GL processor is shutting down".to_string()))?
471            .blocking_send(GLProcessorMessage::ImageConvert(
472                SendablePtr {
473                    ptr: NonNull::from(src),
474                    len: 1,
475                },
476                SendablePtr {
477                    ptr: NonNull::from(dst),
478                    len: 1,
479                },
480                rotation,
481                flip,
482                crop,
483                err_send,
484            ))
485            .map_err(|_| Error::Internal("GL converter thread exited".to_string()))?;
486        err_recv.blocking_recv().map_err(|_| {
487            Error::Internal("GL converter error messaging closed without update".to_string())
488        })?
489    }
490
491    fn draw_decoded_masks(
492        &mut self,
493        dst: &mut TensorDyn,
494        detect: &[crate::DetectBox],
495        segmentation: &[crate::Segmentation],
496        overlay: crate::MaskOverlay<'_>,
497    ) -> crate::Result<()> {
498        let (err_send, err_recv) = tokio::sync::oneshot::channel();
499        self.sender
500            .as_ref()
501            .ok_or_else(|| Error::Internal("GL processor is shutting down".to_string()))?
502            .blocking_send(GLProcessorMessage::DrawDecodedMasks(
503                SendablePtr {
504                    ptr: NonNull::from(dst),
505                    len: 1,
506                },
507                SendablePtr {
508                    ptr: NonNull::new(detect.as_ptr() as *mut DetectBox).unwrap(),
509                    len: detect.len(),
510                },
511                SendablePtr {
512                    ptr: NonNull::new(segmentation.as_ptr() as *mut Segmentation).unwrap(),
513                    len: segmentation.len(),
514                },
515                overlay.opacity,
516                overlay.background.map(|bg| SendablePtr {
517                    ptr: NonNull::from(bg).cast::<TensorDyn>(),
518                    len: 1,
519                }),
520                err_send,
521            ))
522            .map_err(|_| Error::Internal("GL converter thread exited".to_string()))?;
523        err_recv.blocking_recv().map_err(|_| {
524            Error::Internal("GL converter error messaging closed without update".to_string())
525        })?
526    }
527
528    fn draw_proto_masks(
529        &mut self,
530        dst: &mut TensorDyn,
531        detect: &[DetectBox],
532        proto_data: &ProtoData,
533        overlay: crate::MaskOverlay<'_>,
534    ) -> crate::Result<()> {
535        let (err_send, err_recv) = tokio::sync::oneshot::channel();
536        self.sender
537            .as_ref()
538            .ok_or_else(|| Error::Internal("GL processor is shutting down".to_string()))?
539            .blocking_send(GLProcessorMessage::DrawProtoMasks(
540                SendablePtr {
541                    ptr: NonNull::from(dst),
542                    len: 1,
543                },
544                SendablePtr {
545                    ptr: NonNull::new(detect.as_ptr() as *mut DetectBox).unwrap(),
546                    len: detect.len(),
547                },
548                SendablePtr {
549                    ptr: NonNull::from(proto_data).cast::<ProtoData>(),
550                    len: 1,
551                },
552                overlay.opacity,
553                overlay.background.map(|bg| SendablePtr {
554                    ptr: NonNull::from(bg).cast::<TensorDyn>(),
555                    len: 1,
556                }),
557                err_send,
558            ))
559            .map_err(|_| Error::Internal("GL converter thread exited".to_string()))?;
560        err_recv.blocking_recv().map_err(|_| {
561            Error::Internal("GL converter error messaging closed without update".to_string())
562        })?
563    }
564
565    fn set_class_colors(&mut self, colors: &[[u8; 4]]) -> Result<(), crate::Error> {
566        let (err_send, err_recv) = tokio::sync::oneshot::channel();
567        self.sender
568            .as_ref()
569            .ok_or_else(|| Error::Internal("GL processor is shutting down".to_string()))?
570            .blocking_send(GLProcessorMessage::SetColors(colors.to_vec(), err_send))
571            .map_err(|_| Error::Internal("GL converter thread exited".to_string()))?;
572        err_recv.blocking_recv().map_err(|_| {
573            Error::Internal("GL converter error messaging closed without update".to_string())
574        })?
575    }
576}
577
578impl GLProcessorThreaded {
579    /// Sets the interpolation mode for int8 proto textures.
580    pub fn set_int8_interpolation_mode(
581        &mut self,
582        mode: Int8InterpolationMode,
583    ) -> Result<(), crate::Error> {
584        let (err_send, err_recv) = tokio::sync::oneshot::channel();
585        self.sender
586            .as_ref()
587            .ok_or_else(|| Error::Internal("GL processor is shutting down".to_string()))?
588            .blocking_send(GLProcessorMessage::SetInt8Interpolation(mode, err_send))
589            .map_err(|_| Error::Internal("GL converter thread exited".to_string()))?;
590        err_recv.blocking_recv().map_err(|_| {
591            Error::Internal("GL converter error messaging closed without update".to_string())
592        })?
593    }
594
595    /// Create a PBO-backed [`Tensor<u8>`] image on the GL thread.
596    pub fn create_pbo_image(
597        &self,
598        width: usize,
599        height: usize,
600        format: edgefirst_tensor::PixelFormat,
601    ) -> Result<edgefirst_tensor::Tensor<u8>, Error> {
602        let sender = self
603            .sender
604            .as_ref()
605            .ok_or(Error::OpenGl("GL processor is shutting down".to_string()))?;
606
607        let channels = format.channels();
608        let size = match format.layout() {
609            edgefirst_tensor::PixelLayout::SemiPlanar => {
610                // NV12: W*H*3/2, NV16: W*H*2
611                match format {
612                    edgefirst_tensor::PixelFormat::Nv12 => width * height * 3 / 2,
613                    edgefirst_tensor::PixelFormat::Nv16 => width * height * 2,
614                    _ => width * height * channels,
615                }
616            }
617            edgefirst_tensor::PixelLayout::Packed | edgefirst_tensor::PixelLayout::Planar => {
618                width * height * channels
619            }
620            _ => width * height * channels,
621        };
622        if size == 0 {
623            return Err(Error::OpenGl("Invalid image dimensions".to_string()));
624        }
625
626        // Allocate PBO on the GL thread
627        let (tx, rx) = tokio::sync::oneshot::channel();
628        sender
629            .blocking_send(GLProcessorMessage::PboCreate(size, tx))
630            .map_err(|_| Error::OpenGl("GL thread channel closed".to_string()))?;
631        let buffer_id = rx
632            .blocking_recv()
633            .map_err(|_| Error::OpenGl("GL thread did not respond".to_string()))??;
634
635        let ops: std::sync::Arc<dyn edgefirst_tensor::PboOps> = std::sync::Arc::new(GlPboOps {
636            sender: sender.downgrade(),
637        });
638
639        let shape = match format.layout() {
640            edgefirst_tensor::PixelLayout::Planar => vec![channels, height, width],
641            edgefirst_tensor::PixelLayout::SemiPlanar => {
642                let total_h = match format {
643                    edgefirst_tensor::PixelFormat::Nv12 => height * 3 / 2,
644                    edgefirst_tensor::PixelFormat::Nv16 => height * 2,
645                    _ => height * 2,
646                };
647                vec![total_h, width]
648            }
649            _ => vec![height, width, channels],
650        };
651
652        let pbo_tensor =
653            edgefirst_tensor::PboTensor::<u8>::from_pbo(buffer_id, size, &shape, None, ops)
654                .map_err(|e| Error::OpenGl(format!("PBO tensor creation failed: {e:?}")))?;
655        let mut tensor = edgefirst_tensor::Tensor::from_pbo(pbo_tensor);
656        tensor
657            .set_format(format)
658            .map_err(|e| Error::OpenGl(format!("Failed to set format on PBO tensor: {e:?}")))?;
659        Ok(tensor)
660    }
661
662    /// Returns the active transfer backend.
663    pub(crate) fn transfer_backend(&self) -> TransferBackend {
664        self.transfer_backend
665    }
666}
667
668impl Drop for GLProcessorThreaded {
669    fn drop(&mut self) {
670        drop(self.sender.take());
671        let _ = self.handle.take().and_then(|h| h.join().ok());
672    }
673}