use crate::{
    base::DEFAULT_TIMEOUT,
    common::*,
    config::Config,
    context::Context,
    error::{Error as RsError, ErrorChecker, Result},
    frame::{CompositeFrame, Frame, GenericFrameEx},
    pipeline_kind::{self, PipelineState},
    pipeline_profile::PipelineProfile,
};
#[derive(Debug)]
pub struct Pipeline<State>
where
    State: pipeline_kind::PipelineState,
{
    ptr: NonNull<sys::rs2_pipeline>,
    context: Context,
    state: State,
}
pub type InactivePipeline = Pipeline<pipeline_kind::Inactive>;
pub type ActivePipeline = Pipeline<pipeline_kind::Active>;
impl InactivePipeline {
    
    pub fn new() -> Result<Self> {
        let context = Context::new()?;
        let pipeline = Self::from_context(context)?;
        Ok(pipeline)
    }
    
    pub fn from_context(context: Context) -> Result<Self> {
        let ptr = {
            let mut checker = ErrorChecker::new();
            let ptr =
                unsafe { sys::rs2_create_pipeline(context.ptr.as_ptr(), checker.inner_mut_ptr()) };
            checker.check()?;
            ptr
        };
        let pipeline = Self {
            ptr: NonNull::new(ptr).unwrap(),
            context,
            state: pipeline_kind::Inactive,
        };
        Ok(pipeline)
    }
    
    
    
    pub fn start(self, config: impl Into<Option<Config>>) -> Result<ActivePipeline> {
        let config = config.into();
        let ptr = match &config {
            Some(conf) => unsafe {
                let mut checker = ErrorChecker::new();
                let ptr = sys::rs2_pipeline_start_with_config(
                    self.ptr.as_ptr(),
                    conf.ptr.as_ptr(),
                    checker.inner_mut_ptr(),
                );
                checker.check()?;
                ptr
            },
            None => unsafe {
                let mut checker = ErrorChecker::new();
                let ptr = sys::rs2_pipeline_start(self.ptr.as_ptr(), checker.inner_mut_ptr());
                checker.check()?;
                ptr
            },
        };
        let profile = unsafe { PipelineProfile::from_raw(ptr) };
        let pipeline = {
            let (pipeline_ptr, context_ptr) = self.into_raw_parts();
            Pipeline {
                ptr: NonNull::new(pipeline_ptr).unwrap(),
                context: unsafe { Context::from_raw(context_ptr) },
                state: pipeline_kind::Active { profile, config },
            }
        };
        Ok(pipeline)
    }
    
    pub async fn start_async(self, config: impl Into<Option<Config>>) -> Result<ActivePipeline> {
        let config = config.into();
        let pipeline_ptr = AtomicPtr::new(self.ptr.as_ptr());
        let config_ptr_opt = config
            .as_ref()
            .map(|conf| AtomicPtr::new(conf.ptr.as_ptr()));
        let (tx, rx) = futures::channel::oneshot::channel();
        
        thread::spawn(move || {
            let func = || unsafe {
                let profile_ptr = match config_ptr_opt {
                    Some(config_ptr) => {
                        let mut checker = ErrorChecker::new();
                        let ptr = sys::rs2_pipeline_start_with_config(
                            pipeline_ptr.load(Ordering::SeqCst),
                            config_ptr.load(Ordering::SeqCst),
                            checker.inner_mut_ptr(),
                        );
                        checker.check()?;
                        ptr
                    }
                    None => {
                        let mut checker = ErrorChecker::new();
                        let ptr = sys::rs2_pipeline_start(
                            pipeline_ptr.load(Ordering::SeqCst),
                            checker.inner_mut_ptr(),
                        );
                        checker.check()?;
                        ptr
                    }
                };
                Ok(AtomicPtr::new(profile_ptr))
            };
            let result = func();
            let _ = tx.send(result);
        });
        let profile_ptr = rx.await.unwrap()?;
        let profile = unsafe { PipelineProfile::from_raw(profile_ptr.load(Ordering::SeqCst)) };
        let pipeline = {
            let (pipeline_ptr, context_ptr) = self.into_raw_parts();
            Pipeline {
                ptr: NonNull::new(pipeline_ptr).unwrap(),
                context: unsafe { Context::from_raw(context_ptr) },
                state: pipeline_kind::Active { profile, config },
            }
        };
        Ok(pipeline)
    }
    
    
    
    pub fn into_raw_parts(self) -> (*mut sys::rs2_pipeline, *mut sys::rs2_context) {
        
        let ptr = self.ptr;
        let context = unsafe { self.context.unsafe_clone().into_raw() };
        mem::forget(self);
        (ptr.as_ptr(), context)
    }
    
    
    
    pub unsafe fn from_raw_parts(
        pipeline_ptr: *mut sys::rs2_pipeline,
        context_ptr: *mut sys::rs2_context,
    ) -> Self {
        let context = Context::from_raw(context_ptr);
        Self {
            ptr: NonNull::new(pipeline_ptr).unwrap(),
            context,
            state: pipeline_kind::Inactive,
        }
    }
}
impl ActivePipeline {
    
    pub fn profile(&self) -> &PipelineProfile {
        &self.state.profile
    }
    
    
    
    
    
    
    pub fn wait(&mut self, timeout: impl Into<Option<Duration>>) -> Result<Option<CompositeFrame>> {
        let timeout = timeout.into();
        let timeout_ms = timeout.unwrap_or(DEFAULT_TIMEOUT).as_millis() as c_uint;
        let frame = loop {
            let mut checker = ErrorChecker::new();
            let ptr = unsafe {
                sys::rs2_pipeline_wait_for_frames(
                    self.ptr.as_ptr(),
                    timeout_ms,
                    checker.inner_mut_ptr(),
                )
            };
            match (timeout, checker.check()) {
                (None, Err(RsError::Timeout(_))) => continue,
                (Some(_), Err(RsError::Timeout(_))) => {
                    return Ok(None);
                }
                (_, result) => result?,
            }
            let frame = unsafe { Frame::from_raw(ptr) };
            break frame;
        };
        Ok(Some(frame))
    }
    
    
    
    
    pub fn try_wait(&mut self) -> Result<Option<CompositeFrame>> {
        unsafe {
            let mut checker = ErrorChecker::new();
            let mut ptr: *mut sys::rs2_frame = ptr::null_mut();
            let ret = sys::rs2_pipeline_poll_for_frames(
                self.ptr.as_ptr(),
                &mut ptr as *mut _,
                checker.inner_mut_ptr(),
            );
            if let Err(err) = checker.check() {
                return Err(err);
            }
            if ret != 0 {
                let frame = Frame::from_raw(ptr);
                Ok(Some(frame))
            } else {
                Ok(None)
            }
        }
    }
    
    
    
    
    
    
    
    
    pub async fn wait_async(
        &mut self,
        timeout: impl Into<Option<Duration>>,
    ) -> Result<Option<CompositeFrame>> {
        let timeout = timeout.into();
        let timeout_ms = timeout
            .map(|duration| duration.as_millis() as c_uint)
            .unwrap_or(sys::RS2_DEFAULT_TIMEOUT as c_uint);
        let (tx, rx) = futures::channel::oneshot::channel();
        let pipeline_ptr = AtomicPtr::new(self.ptr.as_ptr());
        thread::spawn(move || {
            let result = unsafe {
                loop {
                    let mut checker = ErrorChecker::new();
                    let ptr = sys::rs2_pipeline_wait_for_frames(
                        pipeline_ptr.load(Ordering::Relaxed),
                        timeout_ms,
                        checker.inner_mut_ptr(),
                    );
                    let result = match (timeout, checker.check()) {
                        (None, Err(RsError::Timeout(_))) => continue,
                        (Some(_), Err(RsError::Timeout(_))) => Ok(None),
                        (_, result) => result.map(|_| Some(Frame::from_raw(ptr))),
                    };
                    break result;
                }
            };
            let _ = tx.send(result);
        });
        let frame = rx.await.unwrap()?;
        Ok(frame)
    }
    
    
    
    pub fn stop(self) -> Result<InactivePipeline> {
        unsafe {
            let mut checker = ErrorChecker::new();
            sys::rs2_pipeline_stop(self.ptr.as_ptr(), checker.inner_mut_ptr());
            checker.check()?;
        }
        let pipeline = {
            let (pipeline_ptr, context_ptr, profile_ptr, config_ptr) = self.into_raw_parts();
            mem::drop(unsafe { pipeline_kind::Active::from_raw_parts(profile_ptr, config_ptr) });
            Pipeline {
                ptr: NonNull::new(pipeline_ptr).unwrap(),
                context: unsafe { Context::from_raw(context_ptr) },
                state: pipeline_kind::Inactive,
            }
        };
        Ok(pipeline)
    }
    
    
    
    pub fn into_raw_parts(
        self,
    ) -> (
        *mut sys::rs2_pipeline,
        *mut sys::rs2_context,
        *mut sys::rs2_pipeline_profile,
        Option<*mut sys::rs2_config>,
    ) {
        
        let ptr = self.ptr;
        let context = unsafe { self.context.unsafe_clone().into_raw() };
        let (pipeline_profile, config) = unsafe { self.state.unsafe_clone().into_raw_parts() };
        mem::forget(self);
        (ptr.as_ptr(), context, pipeline_profile, config)
    }
    
    
    
    
    pub unsafe fn from_raw_parts(
        pipeline_ptr: *mut sys::rs2_pipeline,
        context_ptr: *mut sys::rs2_context,
        profile_ptr: *mut sys::rs2_pipeline_profile,
        config_ptr: Option<*mut sys::rs2_config>,
    ) -> Self {
        let context = Context::from_raw(context_ptr);
        let state = pipeline_kind::Active::from_raw_parts(profile_ptr, config_ptr);
        Self {
            ptr: NonNull::new(pipeline_ptr).unwrap(),
            context,
            state,
        }
    }
}
impl<State> Drop for Pipeline<State>
where
    State: pipeline_kind::PipelineState,
{
    fn drop(&mut self) {
        unsafe {
            sys::rs2_delete_pipeline(self.ptr.as_ptr());
        }
    }
}
unsafe impl<State> Send for Pipeline<State> where State: pipeline_kind::PipelineState {}