Skip to main content

edgefirst_image/gl/
threaded.rs

1// SPDX-FileCopyrightText: Copyright 2025 Au-Zone Technologies
2// SPDX-License-Identifier: Apache-2.0
3
4use edgefirst_decoder::{DetectBox, ProtoData, Segmentation};
5use std::ptr::NonNull;
6use std::thread::JoinHandle;
7use tokio::sync::mpsc::{Sender, WeakSender};
8
9use super::processor::GLProcessorST;
10use super::shaders::check_gl_error;
11use super::{EglDisplayKind, Int8InterpolationMode, TransferBackend};
12use crate::{Crop, Error, Flip, ImageProcessorTrait, MaskRegion, Rotation};
13use edgefirst_tensor::TensorDyn;
14
15#[allow(clippy::type_complexity)]
16enum GLProcessorMessage {
17    ImageConvert(
18        SendablePtr<TensorDyn>,
19        SendablePtr<TensorDyn>,
20        Rotation,
21        Flip,
22        Crop,
23        tokio::sync::oneshot::Sender<Result<(), Error>>,
24    ),
25    SetColors(
26        Vec<[u8; 4]>,
27        tokio::sync::oneshot::Sender<Result<(), Error>>,
28    ),
29    DrawMasks(
30        SendablePtr<TensorDyn>,
31        SendablePtr<DetectBox>,
32        SendablePtr<Segmentation>,
33        tokio::sync::oneshot::Sender<Result<(), Error>>,
34    ),
35    DrawMasksProto(
36        SendablePtr<TensorDyn>,
37        SendablePtr<DetectBox>,
38        Box<ProtoData>,
39        tokio::sync::oneshot::Sender<Result<(), Error>>,
40    ),
41    SetInt8Interpolation(
42        Int8InterpolationMode,
43        tokio::sync::oneshot::Sender<Result<(), Error>>,
44    ),
45    DecodeMasksAtlas(
46        SendablePtr<DetectBox>,
47        Box<ProtoData>,
48        usize, // output_width
49        usize, // output_height
50        tokio::sync::oneshot::Sender<Result<(Vec<u8>, Vec<MaskRegion>), Error>>,
51    ),
52    PboCreate(
53        usize, // buffer size in bytes
54        tokio::sync::oneshot::Sender<Result<u32, Error>>,
55    ),
56    PboMap(
57        u32,   // buffer_id
58        usize, // size
59        tokio::sync::oneshot::Sender<Result<edgefirst_tensor::PboMapping, Error>>,
60    ),
61    PboUnmap(
62        u32, // buffer_id
63        tokio::sync::oneshot::Sender<Result<(), Error>>,
64    ),
65    PboDelete(u32), // fire-and-forget, no reply
66}
67
68/// Implements PboOps by sending commands to the GL thread.
69///
70/// Uses a `WeakSender` so that PBO images don't keep the GL thread's channel
71/// alive. When the `GLProcessorThreaded` is dropped, its `Sender` is the last
72/// strong reference — dropping it closes the channel and lets the GL thread
73/// exit. PBO operations after that return `PboDisconnected`.
74struct GlPboOps {
75    sender: WeakSender<GLProcessorMessage>,
76}
77
78// SAFETY: GlPboOps sends all GL operations to the dedicated GL thread via a
79// channel. `map_buffer` returns a CPU-visible pointer from `glMapBufferRange`
80// that remains valid until `unmap_buffer` calls `glUnmapBuffer` on the GL thread.
81// `delete_buffer` sends a fire-and-forget deletion command to the GL thread.
82unsafe impl edgefirst_tensor::PboOps for GlPboOps {
83    fn map_buffer(
84        &self,
85        buffer_id: u32,
86        size: usize,
87    ) -> edgefirst_tensor::Result<edgefirst_tensor::PboMapping> {
88        let sender = self
89            .sender
90            .upgrade()
91            .ok_or(edgefirst_tensor::Error::PboDisconnected)?;
92        let (tx, rx) = tokio::sync::oneshot::channel();
93        sender
94            .blocking_send(GLProcessorMessage::PboMap(buffer_id, size, tx))
95            .map_err(|_| edgefirst_tensor::Error::PboDisconnected)?;
96        rx.blocking_recv()
97            .map_err(|_| edgefirst_tensor::Error::PboDisconnected)?
98            .map_err(|e| {
99                edgefirst_tensor::Error::NotImplemented(format!("GL PBO map failed: {e:?}"))
100            })
101    }
102
103    fn unmap_buffer(&self, buffer_id: u32) -> edgefirst_tensor::Result<()> {
104        let sender = self
105            .sender
106            .upgrade()
107            .ok_or(edgefirst_tensor::Error::PboDisconnected)?;
108        let (tx, rx) = tokio::sync::oneshot::channel();
109        sender
110            .blocking_send(GLProcessorMessage::PboUnmap(buffer_id, tx))
111            .map_err(|_| edgefirst_tensor::Error::PboDisconnected)?;
112        rx.blocking_recv()
113            .map_err(|_| edgefirst_tensor::Error::PboDisconnected)?
114            .map_err(|e| {
115                edgefirst_tensor::Error::NotImplemented(format!("GL PBO unmap failed: {e:?}"))
116            })
117    }
118
119    fn delete_buffer(&self, buffer_id: u32) {
120        if let Some(sender) = self.sender.upgrade() {
121            let _ = sender.blocking_send(GLProcessorMessage::PboDelete(buffer_id));
122        }
123    }
124}
125
126/// OpenGL multi-threaded image converter. The actual conversion is done in a
127/// separate rendering thread, as OpenGL contexts are not thread-safe. This can
128/// be safely sent between threads. The `convert()` call sends the conversion
129/// request to the rendering thread and waits for the result.
130#[derive(Debug)]
131pub struct GLProcessorThreaded {
132    // This is only None when the converter is being dropped.
133    handle: Option<JoinHandle<()>>,
134
135    // This is only None when the converter is being dropped.
136    sender: Option<Sender<GLProcessorMessage>>,
137    transfer_backend: TransferBackend,
138}
139
140unsafe impl Send for GLProcessorThreaded {}
141unsafe impl Sync for GLProcessorThreaded {}
142
143struct SendablePtr<T: Send> {
144    ptr: NonNull<T>,
145    len: usize,
146}
147
148unsafe impl<T> Send for SendablePtr<T> where T: Send {}
149
150impl GLProcessorThreaded {
151    /// Creates a new OpenGL multi-threaded image converter.
152    pub fn new(kind: Option<EglDisplayKind>) -> Result<Self, Error> {
153        let (send, mut recv) = tokio::sync::mpsc::channel::<GLProcessorMessage>(1);
154
155        let (create_ctx_send, create_ctx_recv) = tokio::sync::oneshot::channel();
156
157        let func = move || {
158            let mut gl_converter = match GLProcessorST::new(kind) {
159                Ok(gl) => gl,
160                Err(e) => {
161                    let _ = create_ctx_send.send(Err(e));
162                    return;
163                }
164            };
165            let _ = create_ctx_send.send(Ok(gl_converter.gl_context.transfer_backend));
166            while let Some(msg) = recv.blocking_recv() {
167                match msg {
168                    GLProcessorMessage::ImageConvert(src, mut dst, rotation, flip, crop, resp) => {
169                        // SAFETY: This is safe because the convert() function waits for the resp to
170                        // be sent before dropping the borrow for src and dst
171                        let src = unsafe { src.ptr.as_ref() };
172                        let dst = unsafe { dst.ptr.as_mut() };
173                        let res = gl_converter.convert(src, dst, rotation, flip, crop);
174                        let _ = resp.send(res);
175                    }
176                    GLProcessorMessage::DrawMasks(mut dst, det, seg, resp) => {
177                        // SAFETY: This is safe because the draw_masks() function waits for the
178                        // resp to be sent before dropping the borrow for dst, detect, and
179                        // segmentation
180                        let dst = unsafe { dst.ptr.as_mut() };
181                        let det = unsafe { std::slice::from_raw_parts(det.ptr.as_ptr(), det.len) };
182                        let seg = unsafe { std::slice::from_raw_parts(seg.ptr.as_ptr(), seg.len) };
183                        let res = gl_converter.draw_masks(dst, det, seg);
184                        let _ = resp.send(res);
185                    }
186                    GLProcessorMessage::DrawMasksProto(mut dst, det, proto_data, resp) => {
187                        // SAFETY: Same safety invariant as DrawMasks — caller
188                        // blocks on resp before dropping borrows.
189                        let dst = unsafe { dst.ptr.as_mut() };
190                        let det = unsafe { std::slice::from_raw_parts(det.ptr.as_ptr(), det.len) };
191                        let res = gl_converter.draw_masks_proto(dst, det, &proto_data);
192                        let _ = resp.send(res);
193                    }
194                    GLProcessorMessage::SetColors(colors, resp) => {
195                        let res = gl_converter.set_class_colors(&colors);
196                        let _ = resp.send(res);
197                    }
198                    GLProcessorMessage::SetInt8Interpolation(mode, resp) => {
199                        gl_converter.set_int8_interpolation_mode(mode);
200                        let _ = resp.send(Ok(()));
201                    }
202                    GLProcessorMessage::DecodeMasksAtlas(
203                        det,
204                        proto_data,
205                        output_width,
206                        output_height,
207                        resp,
208                    ) => {
209                        let det = unsafe { std::slice::from_raw_parts(det.ptr.as_ptr(), det.len) };
210                        let res = gl_converter.decode_masks_atlas(
211                            det,
212                            &proto_data,
213                            output_width,
214                            output_height,
215                        );
216                        let _ = resp.send(res);
217                    }
218                    GLProcessorMessage::PboCreate(size, resp) => {
219                        let result = unsafe {
220                            let mut id: u32 = 0;
221                            gls::gl::GenBuffers(1, &mut id);
222                            gls::gl::BindBuffer(gls::gl::PIXEL_PACK_BUFFER, id);
223                            gls::gl::BufferData(
224                                gls::gl::PIXEL_PACK_BUFFER,
225                                size as isize,
226                                std::ptr::null(),
227                                gls::gl::STREAM_COPY,
228                            );
229                            gls::gl::BindBuffer(gls::gl::PIXEL_PACK_BUFFER, 0);
230                            match check_gl_error("PboCreate", 0) {
231                                Ok(()) => Ok(id),
232                                Err(e) => {
233                                    gls::gl::DeleteBuffers(1, &id);
234                                    Err(e)
235                                }
236                            }
237                        };
238                        let _ = resp.send(result);
239                    }
240                    GLProcessorMessage::PboMap(buffer_id, size, resp) => {
241                        let result = unsafe {
242                            gls::gl::BindBuffer(gls::gl::PIXEL_PACK_BUFFER, buffer_id);
243                            let ptr = gls::gl::MapBufferRange(
244                                gls::gl::PIXEL_PACK_BUFFER,
245                                0,
246                                size as isize,
247                                gls::gl::MAP_READ_BIT | gls::gl::MAP_WRITE_BIT,
248                            );
249                            gls::gl::BindBuffer(gls::gl::PIXEL_PACK_BUFFER, 0);
250                            if ptr.is_null() {
251                                Err(crate::Error::OpenGl(
252                                    "glMapBufferRange returned null".to_string(),
253                                ))
254                            } else {
255                                Ok(edgefirst_tensor::PboMapping {
256                                    ptr: ptr as *mut u8,
257                                    size,
258                                })
259                            }
260                        };
261                        let _ = resp.send(result);
262                    }
263                    GLProcessorMessage::PboUnmap(buffer_id, resp) => {
264                        let result = unsafe {
265                            gls::gl::BindBuffer(gls::gl::PIXEL_PACK_BUFFER, buffer_id);
266                            let ok = gls::gl::UnmapBuffer(gls::gl::PIXEL_PACK_BUFFER);
267                            gls::gl::BindBuffer(gls::gl::PIXEL_PACK_BUFFER, 0);
268                            if ok == gls::gl::FALSE {
269                                Err(Error::OpenGl(
270                                    "PBO data was corrupted during mapping".into(),
271                                ))
272                            } else {
273                                check_gl_error("PboUnmap", 0)
274                            }
275                        };
276                        let _ = resp.send(result);
277                    }
278                    GLProcessorMessage::PboDelete(buffer_id) => unsafe {
279                        gls::gl::DeleteBuffers(1, &buffer_id);
280                    },
281                }
282            }
283        };
284
285        // let handle = tokio::task::spawn(func());
286        let handle = std::thread::spawn(func);
287
288        let transfer_backend = match create_ctx_recv.blocking_recv() {
289            Ok(Err(e)) => return Err(e),
290            Err(_) => {
291                return Err(Error::Internal(
292                    "GL converter error messaging closed without update".to_string(),
293                ));
294            }
295            Ok(Ok(tb)) => tb,
296        };
297
298        Ok(Self {
299            handle: Some(handle),
300            sender: Some(send),
301            transfer_backend,
302        })
303    }
304}
305
306impl ImageProcessorTrait for GLProcessorThreaded {
307    fn convert(
308        &mut self,
309        src: &TensorDyn,
310        dst: &mut TensorDyn,
311        rotation: crate::Rotation,
312        flip: Flip,
313        crop: Crop,
314    ) -> crate::Result<()> {
315        let (err_send, err_recv) = tokio::sync::oneshot::channel();
316        self.sender
317            .as_ref()
318            .unwrap()
319            .blocking_send(GLProcessorMessage::ImageConvert(
320                SendablePtr {
321                    ptr: NonNull::from(src),
322                    len: 1,
323                },
324                SendablePtr {
325                    ptr: NonNull::from(dst),
326                    len: 1,
327                },
328                rotation,
329                flip,
330                crop,
331                err_send,
332            ))
333            .map_err(|_| Error::Internal("GL converter thread exited".to_string()))?;
334        err_recv.blocking_recv().map_err(|_| {
335            Error::Internal("GL converter error messaging closed without update".to_string())
336        })?
337    }
338
339    fn draw_masks(
340        &mut self,
341        dst: &mut TensorDyn,
342        detect: &[crate::DetectBox],
343        segmentation: &[crate::Segmentation],
344    ) -> crate::Result<()> {
345        let (err_send, err_recv) = tokio::sync::oneshot::channel();
346        self.sender
347            .as_ref()
348            .unwrap()
349            .blocking_send(GLProcessorMessage::DrawMasks(
350                SendablePtr {
351                    ptr: NonNull::from(dst),
352                    len: 1,
353                },
354                SendablePtr {
355                    ptr: NonNull::new(detect.as_ptr() as *mut DetectBox).unwrap(),
356                    len: detect.len(),
357                },
358                SendablePtr {
359                    ptr: NonNull::new(segmentation.as_ptr() as *mut Segmentation).unwrap(),
360                    len: segmentation.len(),
361                },
362                err_send,
363            ))
364            .map_err(|_| Error::Internal("GL converter thread exited".to_string()))?;
365        err_recv.blocking_recv().map_err(|_| {
366            Error::Internal("GL converter error messaging closed without update".to_string())
367        })?
368    }
369
370    fn draw_masks_proto(
371        &mut self,
372        dst: &mut TensorDyn,
373        detect: &[DetectBox],
374        proto_data: &ProtoData,
375    ) -> crate::Result<()> {
376        let (err_send, err_recv) = tokio::sync::oneshot::channel();
377        self.sender
378            .as_ref()
379            .unwrap()
380            .blocking_send(GLProcessorMessage::DrawMasksProto(
381                SendablePtr {
382                    ptr: NonNull::from(dst),
383                    len: 1,
384                },
385                SendablePtr {
386                    ptr: NonNull::new(detect.as_ptr() as *mut DetectBox).unwrap(),
387                    len: detect.len(),
388                },
389                Box::new(proto_data.clone()),
390                err_send,
391            ))
392            .map_err(|_| Error::Internal("GL converter thread exited".to_string()))?;
393        err_recv.blocking_recv().map_err(|_| {
394            Error::Internal("GL converter error messaging closed without update".to_string())
395        })?
396    }
397
398    fn decode_masks_atlas(
399        &mut self,
400        detect: &[DetectBox],
401        proto_data: ProtoData,
402        output_width: usize,
403        output_height: usize,
404    ) -> crate::Result<(Vec<u8>, Vec<MaskRegion>)> {
405        GLProcessorThreaded::decode_masks_atlas(
406            self,
407            detect,
408            proto_data,
409            output_width,
410            output_height,
411        )
412    }
413
414    fn set_class_colors(&mut self, colors: &[[u8; 4]]) -> Result<(), crate::Error> {
415        let (err_send, err_recv) = tokio::sync::oneshot::channel();
416        self.sender
417            .as_ref()
418            .unwrap()
419            .blocking_send(GLProcessorMessage::SetColors(colors.to_vec(), err_send))
420            .map_err(|_| Error::Internal("GL converter thread exited".to_string()))?;
421        err_recv.blocking_recv().map_err(|_| {
422            Error::Internal("GL converter error messaging closed without update".to_string())
423        })?
424    }
425}
426
427impl GLProcessorThreaded {
428    /// Sets the interpolation mode for int8 proto textures.
429    pub fn set_int8_interpolation_mode(
430        &mut self,
431        mode: Int8InterpolationMode,
432    ) -> Result<(), crate::Error> {
433        let (err_send, err_recv) = tokio::sync::oneshot::channel();
434        self.sender
435            .as_ref()
436            .unwrap()
437            .blocking_send(GLProcessorMessage::SetInt8Interpolation(mode, err_send))
438            .map_err(|_| Error::Internal("GL converter thread exited".to_string()))?;
439        err_recv.blocking_recv().map_err(|_| {
440            Error::Internal("GL converter error messaging closed without update".to_string())
441        })?
442    }
443
444    /// Decode all detection masks into a compact atlas via the GL thread.
445    ///
446    /// Returns `(atlas_pixels, regions)` where `atlas_pixels` is a contiguous
447    /// `Vec<u8>` of shape `[atlas_h, output_width]` (compact, bbox-sized strips)
448    /// and `regions` describes each detection's location within the atlas.
449    pub fn decode_masks_atlas(
450        &mut self,
451        detect: &[DetectBox],
452        proto_data: ProtoData,
453        output_width: usize,
454        output_height: usize,
455    ) -> Result<(Vec<u8>, Vec<MaskRegion>), crate::Error> {
456        let (resp_send, resp_recv) = tokio::sync::oneshot::channel();
457        self.sender
458            .as_ref()
459            .unwrap()
460            .blocking_send(GLProcessorMessage::DecodeMasksAtlas(
461                SendablePtr {
462                    ptr: NonNull::new(detect.as_ptr() as *mut DetectBox).unwrap(),
463                    len: detect.len(),
464                },
465                Box::new(proto_data),
466                output_width,
467                output_height,
468                resp_send,
469            ))
470            .map_err(|_| Error::Internal("GL converter thread exited".to_string()))?;
471        resp_recv.blocking_recv().map_err(|_| {
472            Error::Internal("GL converter error messaging closed without update".to_string())
473        })?
474    }
475
476    /// Create a PBO-backed [`Tensor<u8>`] image on the GL thread.
477    pub fn create_pbo_image(
478        &self,
479        width: usize,
480        height: usize,
481        format: edgefirst_tensor::PixelFormat,
482    ) -> Result<edgefirst_tensor::Tensor<u8>, Error> {
483        let sender = self
484            .sender
485            .as_ref()
486            .ok_or(Error::OpenGl("GL processor is shutting down".to_string()))?;
487
488        let channels = format.channels();
489        let size = match format.layout() {
490            edgefirst_tensor::PixelLayout::SemiPlanar => {
491                // NV12: W*H*3/2, NV16: W*H*2
492                match format {
493                    edgefirst_tensor::PixelFormat::Nv12 => width * height * 3 / 2,
494                    edgefirst_tensor::PixelFormat::Nv16 => width * height * 2,
495                    _ => width * height * channels,
496                }
497            }
498            edgefirst_tensor::PixelLayout::Packed | edgefirst_tensor::PixelLayout::Planar => {
499                width * height * channels
500            }
501            _ => width * height * channels,
502        };
503        if size == 0 {
504            return Err(Error::OpenGl("Invalid image dimensions".to_string()));
505        }
506
507        // Allocate PBO on the GL thread
508        let (tx, rx) = tokio::sync::oneshot::channel();
509        sender
510            .blocking_send(GLProcessorMessage::PboCreate(size, tx))
511            .map_err(|_| Error::OpenGl("GL thread channel closed".to_string()))?;
512        let buffer_id = rx
513            .blocking_recv()
514            .map_err(|_| Error::OpenGl("GL thread did not respond".to_string()))??;
515
516        let ops: std::sync::Arc<dyn edgefirst_tensor::PboOps> = std::sync::Arc::new(GlPboOps {
517            sender: sender.downgrade(),
518        });
519
520        let shape = match format.layout() {
521            edgefirst_tensor::PixelLayout::Planar => vec![channels, height, width],
522            edgefirst_tensor::PixelLayout::SemiPlanar => {
523                let total_h = match format {
524                    edgefirst_tensor::PixelFormat::Nv12 => height * 3 / 2,
525                    edgefirst_tensor::PixelFormat::Nv16 => height * 2,
526                    _ => height * 2,
527                };
528                vec![total_h, width]
529            }
530            _ => vec![height, width, channels],
531        };
532
533        let pbo_tensor =
534            edgefirst_tensor::PboTensor::<u8>::from_pbo(buffer_id, size, &shape, None, ops)
535                .map_err(|e| Error::OpenGl(format!("PBO tensor creation failed: {e:?}")))?;
536        let mut tensor = edgefirst_tensor::Tensor::from_pbo(pbo_tensor);
537        tensor
538            .set_format(format)
539            .map_err(|e| Error::OpenGl(format!("Failed to set format on PBO tensor: {e:?}")))?;
540        Ok(tensor)
541    }
542
543    /// Returns the active transfer backend.
544    pub(crate) fn transfer_backend(&self) -> TransferBackend {
545        self.transfer_backend
546    }
547}
548
549impl Drop for GLProcessorThreaded {
550    fn drop(&mut self) {
551        drop(self.sender.take());
552        let _ = self.handle.take().and_then(|h| h.join().ok());
553    }
554}