unit_rs/
unit.rs

1use std::any::Any;
2use std::panic::AssertUnwindSafe;
3use std::ptr::NonNull;
4use std::sync::{Arc, Mutex, MutexGuard, Once, Weak};
5
6use libc::c_void;
7
8use crate::error::{UnitError, UnitInitError, UnitResult};
9use crate::nxt_unit::{
10    self, nxt_unit_ctx_t, nxt_unit_done, nxt_unit_init, nxt_unit_init_t, nxt_unit_request_done,
11    nxt_unit_request_info_t, nxt_unit_response_init, nxt_unit_run,
12};
13use crate::request::Request;
14
15unsafe extern "C" fn request_handler(req: *mut nxt_unit_request_info_t) {
16    // SAFETY: The context data is passed as Unit context-specific user data,
17    // and individual Unit contexts correspond to individual threads.
18    let context_data = (*(*req).ctx).data as *mut ContextData;
19    let context_data = &mut *context_data;
20
21    let rc = nxt_unit_response_init(req, 200, 1, 0 as u32);
22
23    if rc != nxt_unit::NXT_UNIT_OK as i32 {
24        nxt_unit_request_done(req, rc);
25        return;
26    }
27
28    let rc = if let Some(service) = &mut context_data.request_handler {
29        let unit_request = Request {
30            nxt_request: &mut *req,
31            _lifetime: Default::default(),
32        };
33
34        // This assertion is safe because the panic payload is not examined, and
35        // the panic will just be forwarded through Unit's C FFI and resumed.
36        let handler = AssertUnwindSafe(|| service.handle_request(unit_request));
37
38        match std::panic::catch_unwind(handler) {
39            Ok(Ok(())) => nxt_unit::NXT_UNIT_OK as i32,
40            Ok(Err(UnitError(rc))) => rc,
41            Err(panic_payload) => {
42                nxt_unit_request_done(req, nxt_unit::NXT_UNIT_ERROR as i32);
43
44                // FIXME: Find a way to stop the run loop
45                // Trying to implement `nxt_unit_run` manually is not possible
46                // since `nxt_unit_quit` is not exposed in the API.
47                std::panic::resume_unwind(panic_payload)
48                // context_data.panic_payload = Some(panic_payload);
49                // nxt_unit::NXT_UNIT_ERROR as i32
50            }
51        }
52    } else {
53        nxt_unit::NXT_UNIT_OK as i32
54    };
55
56    nxt_unit_request_done(req, rc);
57}
58
59struct ContextData {
60    request_handler: Option<Box<dyn UnitService>>,
61    unit_is_ready: bool,
62    panic_payload: Option<Box<dyn Any + Send>>,
63}
64
65unsafe extern "C" fn ready_handler(ctx: *mut nxt_unit_ctx_t) -> i32 {
66    // SAFETY: This is only ever called once, in the main thread, while no other
67    // main thread handlers are running.
68    let context_data = (*ctx).data as *mut ContextData;
69    let context_data = &mut *context_data;
70
71    context_data.unit_is_ready = true;
72
73    nxt_unit::NXT_UNIT_OK as i32
74}
75
76static mut MAIN_CONTEXT: Option<Mutex<MainContext>> = None;
77static MAIN_CONTEXT_INIT: Once = Once::new();
78
79enum MainContext {
80    Uninitialized,
81    InitFailed(UnitInitError),
82    Initialized(Weak<UnitContextWrapper>),
83}
84
85fn main_context() -> MutexGuard<'static, MainContext> {
86    unsafe {
87        MAIN_CONTEXT_INIT.call_once(|| {
88            MAIN_CONTEXT = Some(Mutex::new(MainContext::Uninitialized));
89        });
90        MAIN_CONTEXT
91            .as_ref()
92            .expect("Initialized above")
93            .lock()
94            .expect("Main context should not be poisoned")
95    }
96}
97
98/// The Unit application context.
99///
100/// This object wraps the `libunit` library, which talks to the Unit server over
101/// shared memory and a unix socket in order to receive data about requests.
102///
103/// This object is not [`Send`] nor [`Sync`], and cannot be sent to other
104/// threads.
105///
106/// However, multiple objects of this type may be created; additional [`Unit`]
107/// objects will automatically be linked to the first through a global mutex,
108/// and will be able to receive and process requests in other threads.
109///
110/// The first context is internally wrapped in an [`Arc`], shared among all
111/// instances [`Unit`] and will be deallocated when the last [`Unit`] object is
112/// dropped.
113pub struct Unit {
114    context_wrapper: Option<Arc<UnitContextWrapper>>,
115    context_data: *mut ContextData,
116}
117
118impl Unit {
119    /// Create a new Unit context capable of receiving and handling requests on
120    /// the current thread.
121    ///
122    /// If called after a previous [`Unit`] was constructed but already received
123    /// a QUIT event from the Unit server, this will return a no-op [`Unit`]
124    /// instance whose [`Unit::run`] method will immediately return.
125    ///
126    /// If called after a previous [`Unit`] failed to initialize, this will
127    /// return the same initialization failure.
128    pub fn new() -> Result<Self, UnitInitError> {
129        let mut main_context = main_context();
130
131        let main_unit_context = match &*main_context {
132            MainContext::InitFailed(UnitInitError) => {
133                return Err(UnitInitError);
134            }
135            MainContext::Uninitialized => None,
136            MainContext::Initialized(main_unit_context) => {
137                match main_unit_context.upgrade() {
138                    Some(context) => Some(context),
139                    None => {
140                        // The main thread already exited; fast-track all future threads to
141                        // exit as well.
142                        return Ok(Self {
143                            context_wrapper: None,
144                            context_data: std::ptr::null_mut(),
145                        });
146                    }
147                }
148            }
149        };
150
151        if let Some(main_unit_context) = main_unit_context {
152            // Additional contexts are created from the first.
153
154            let context_data = Box::new(ContextData {
155                request_handler: None,
156                unit_is_ready: false,
157                panic_payload: None,
158            });
159
160            let context_user_data = Box::into_raw(context_data);
161
162            let ctx = unsafe {
163                nxt_unit::nxt_unit_ctx_alloc(
164                    main_unit_context.context.as_ptr(),
165                    context_user_data as *mut c_void,
166                )
167            };
168
169            let ctx = match NonNull::new(ctx) {
170                Some(ctx) => ctx,
171                None => {
172                    return Err(UnitInitError);
173                }
174            };
175
176            let context_wrapper = UnitContextWrapper {
177                parent_context: Some(main_unit_context.clone()),
178                context: ctx,
179            };
180
181            Ok(Self {
182                context_wrapper: Some(Arc::new(context_wrapper)),
183                context_data: context_user_data,
184            })
185        } else {
186            // First context ever created.
187
188            let context_data = Box::new(ContextData {
189                request_handler: None,
190                unit_is_ready: false,
191                panic_payload: None,
192            });
193
194            let context_user_data = Box::into_raw(context_data);
195
196            let ctx = unsafe {
197                let mut init: nxt_unit_init_t = std::mem::zeroed();
198                init.callbacks.request_handler = Some(request_handler);
199                init.callbacks.ready_handler = Some(ready_handler);
200
201                init.ctx_data = context_user_data as *mut c_void;
202
203                nxt_unit_init(&mut init)
204            };
205
206            let ctx = match NonNull::new(ctx) {
207                Some(ctx) => ctx,
208                None => {
209                    *main_context = MainContext::InitFailed(UnitInitError);
210                    return Err(UnitInitError);
211                }
212            };
213
214            // Run until the ready handler is called.
215            loop {
216                let rc = unsafe { nxt_unit::nxt_unit_run_once(ctx.as_ptr()) };
217
218                if rc != nxt_unit::NXT_UNIT_OK as i32 {
219                    *main_context = MainContext::InitFailed(UnitInitError);
220                    return Err(UnitInitError);
221                }
222
223                // Check if the ready handler was called.
224                // SAFETY: This data is thread-specific, and not shared
225                // anywhere.
226                unsafe {
227                    let context_data = (*ctx.as_ptr()).data as *mut ContextData;
228                    let context_data = &mut *context_data;
229
230                    if context_data.unit_is_ready {
231                        break;
232                    }
233                }
234            }
235
236            let context_wrapper = Arc::new(UnitContextWrapper {
237                parent_context: None,
238                context: ctx,
239            });
240
241            // Keep a global weak reference to this, other Unit contexts will be
242            // spawned from it.
243            *main_context = MainContext::Initialized(Arc::downgrade(&context_wrapper));
244
245            Ok(Self {
246                context_wrapper: Some(context_wrapper),
247                context_data: context_user_data,
248            })
249        }
250    }
251
252    fn context_data_mut(&mut self) -> &mut ContextData {
253        // SAFETY: The only other thing that can access this is `.run()`, which
254        // requires `&mut self` and therefore guaranteed not to be active.
255        unsafe { &mut *self.context_data }
256    }
257
258    /// Set a request handler for the Unit application.
259    ///
260    /// The handler must be an object that implements the [`UnitService`] trait.
261    ///
262    /// This trait is automatically implemented for functions or lambda
263    /// functions  that take a [`Request`] object and return a
264    /// [`UnitResult<()>`](UnitResult).
265    pub fn set_request_handler(&mut self, f: impl UnitService + 'static) {
266        if self.context_wrapper.is_none() {
267            return;
268        }
269        self.context_data_mut().request_handler = Some(Box::new(f))
270    }
271
272    /// Enter the main event loop, handling requests on this thread until the
273    /// Unit server exits or requests a restart.
274    ///
275    /// This may be executed in parallel with other threads that call
276    /// [`Unit::run()`]
277    pub fn run(&mut self) {
278        if let Some(context_wrapper) = &self.context_wrapper {
279            // SAFETY: Call via FFI into Unit's main loop. It will call back into
280            // Rust code using callbacks, which must use catch_unwind to be
281            // FFI-safe.
282            unsafe {
283                nxt_unit_run(context_wrapper.context.as_ptr());
284            }
285
286            // Resume any panics forwarded through the C FFI.
287            // TODO: This is not yet functional, see catch_unwind above.
288            if let Some(panic_payload) = self.context_data_mut().panic_payload.take() {
289                std::panic::resume_unwind(panic_payload);
290            }
291        }
292    }
293}
294
295// A wrapper over Unit's context that deallocates the context when dropped.
296struct UnitContextWrapper {
297    parent_context: Option<Arc<UnitContextWrapper>>,
298    context: NonNull<nxt_unit_ctx_t>,
299}
300
301impl Drop for UnitContextWrapper {
302    fn drop(&mut self) {
303        // The order here is important. Secondary contexts are created from a
304        // main context, which must be dropped last.
305
306        // SAFETY: This structure is only ever held in an Arc, meaning that this
307        // is the last instance of it, and it's being dropped.
308        unsafe {
309            nxt_unit_done(self.context.as_ptr());
310        }
311
312        // This is an Arc, which may or may not call the parent's drop.
313        drop(self.parent_context.take());
314    }
315}
316
317impl Drop for Unit {
318    fn drop(&mut self) {
319        // SAFETY: This structure is the only owner of the box, and is being
320        // dropped, therefore not currently being shared.
321        unsafe {
322            drop(Box::from_raw(self.context_data));
323        }
324
325        // Note: Everything that uses the contex must be dropped before this.
326        drop(self.context_wrapper.take());
327    }
328}
329
330/// A trait that can be implemented by request handlers to be used with
331/// [`Unit::set_request_handler()`].
332///
333/// This trait is automatically implemented for functions or lambda functions
334/// that take a [`Request`] object and return a [`UnitResult<()>`](UnitResult).
335pub trait UnitService {
336    fn handle_request(&mut self, req: Request) -> UnitResult<()>;
337}
338
339impl<F> UnitService for F
340where
341    F: FnMut(Request) -> UnitResult<()> + 'static,
342{
343    fn handle_request(&mut self, req: Request) -> UnitResult<()> {
344        self(req)
345    }
346}