use std::fmt;
use std::marker;
use std::ptr;
use std::sync::Arc;
use std::time::Duration;
use libc::{c_char, c_int, c_long, c_short, c_void};
#[cfg(unix)]
use libc::{pollfd, POLLIN, POLLOUT, POLLPRI};
use crate::easy::{Easy, Easy2, List};
use crate::panic;
use crate::{Error, MultiError};
pub struct Multi {
raw: Arc<RawMulti>,
data: Box<MultiData>,
}
#[derive(Debug)]
struct RawMulti {
handle: *mut curl_sys::CURLM,
}
struct MultiData {
socket: Box<dyn FnMut(Socket, SocketEvents, usize) + Send>,
timer: Box<dyn FnMut(Option<Duration>) -> bool + Send>,
}
pub struct Message<'multi> {
ptr: *mut curl_sys::CURLMsg,
_multi: &'multi Multi,
}
pub struct EasyHandle {
guard: DetachGuard,
easy: Easy,
_marker: marker::PhantomData<&'static Multi>,
}
pub struct Easy2Handle<H> {
guard: DetachGuard,
easy: Easy2<H>,
_marker: marker::PhantomData<&'static Multi>,
}
struct DetachGuard {
multi: Arc<RawMulti>,
easy: *mut curl_sys::CURL,
}
pub struct Events {
bits: c_int,
}
pub struct SocketEvents {
bits: c_int,
}
pub type Socket = curl_sys::curl_socket_t;
pub struct WaitFd {
inner: curl_sys::curl_waitfd,
}
#[cfg(feature = "poll_7_68_0")]
#[derive(Debug, Clone)]
pub struct MultiWaker {
raw: std::sync::Weak<RawMulti>,
}
#[cfg(feature = "poll_7_68_0")]
unsafe impl Send for MultiWaker {}
#[cfg(feature = "poll_7_68_0")]
unsafe impl Sync for MultiWaker {}
impl Multi {
pub fn new() -> Multi {
unsafe {
crate::init();
let ptr = curl_sys::curl_multi_init();
assert!(!ptr.is_null());
Multi {
raw: Arc::new(RawMulti { handle: ptr }),
data: Box::new(MultiData {
socket: Box::new(|_, _, _| ()),
timer: Box::new(|_| true),
}),
}
}
}
pub fn socket_function<F>(&mut self, f: F) -> Result<(), MultiError>
where
F: FnMut(Socket, SocketEvents, usize) + Send + 'static,
{
self._socket_function(Box::new(f))
}
fn _socket_function(
&mut self,
f: Box<dyn FnMut(Socket, SocketEvents, usize) + Send>,
) -> Result<(), MultiError> {
self.data.socket = f;
let cb: curl_sys::curl_socket_callback = cb;
self.setopt_ptr(
curl_sys::CURLMOPT_SOCKETFUNCTION,
cb as usize as *const c_char,
)?;
let ptr = &*self.data as *const _;
self.setopt_ptr(curl_sys::CURLMOPT_SOCKETDATA, ptr as *const c_char)?;
return Ok(());
extern "C" fn cb(
_easy: *mut curl_sys::CURL,
socket: curl_sys::curl_socket_t,
what: c_int,
userptr: *mut c_void,
socketp: *mut c_void,
) -> c_int {
panic::catch(|| unsafe {
let f = &mut (*(userptr as *mut MultiData)).socket;
f(socket, SocketEvents { bits: what }, socketp as usize)
});
0
}
}
pub fn assign(&self, socket: Socket, token: usize) -> Result<(), MultiError> {
unsafe {
cvt(curl_sys::curl_multi_assign(
self.raw.handle,
socket,
token as *mut _,
))?;
Ok(())
}
}
pub fn timer_function<F>(&mut self, f: F) -> Result<(), MultiError>
where
F: FnMut(Option<Duration>) -> bool + Send + 'static,
{
self._timer_function(Box::new(f))
}
fn _timer_function(
&mut self,
f: Box<dyn FnMut(Option<Duration>) -> bool + Send>,
) -> Result<(), MultiError> {
self.data.timer = f;
let cb: curl_sys::curl_multi_timer_callback = cb;
self.setopt_ptr(
curl_sys::CURLMOPT_TIMERFUNCTION,
cb as usize as *const c_char,
)?;
let ptr = &*self.data as *const _;
self.setopt_ptr(curl_sys::CURLMOPT_TIMERDATA, ptr as *const c_char)?;
return Ok(());
extern "C" fn cb(
_multi: *mut curl_sys::CURLM,
timeout_ms: c_long,
user: *mut c_void,
) -> c_int {
let keep_going = panic::catch(|| unsafe {
let f = &mut (*(user as *mut MultiData)).timer;
if timeout_ms == -1 {
f(None)
} else {
f(Some(Duration::from_millis(timeout_ms as u64)))
}
})
.unwrap_or(false);
if keep_going {
0
} else {
-1
}
}
}
pub fn pipelining(&mut self, http_1: bool, multiplex: bool) -> Result<(), MultiError> {
let bitmask = if http_1 { curl_sys::CURLPIPE_HTTP1 } else { 0 }
| if multiplex {
curl_sys::CURLPIPE_MULTIPLEX
} else {
0
};
self.setopt_long(curl_sys::CURLMOPT_PIPELINING, bitmask)
}
pub fn set_max_host_connections(&mut self, val: usize) -> Result<(), MultiError> {
self.setopt_long(curl_sys::CURLMOPT_MAX_HOST_CONNECTIONS, val as c_long)
}
pub fn set_max_total_connections(&mut self, val: usize) -> Result<(), MultiError> {
self.setopt_long(curl_sys::CURLMOPT_MAX_TOTAL_CONNECTIONS, val as c_long)
}
pub fn set_max_connects(&mut self, val: usize) -> Result<(), MultiError> {
self.setopt_long(curl_sys::CURLMOPT_MAXCONNECTS, val as c_long)
}
pub fn set_pipeline_length(&mut self, val: usize) -> Result<(), MultiError> {
self.setopt_long(curl_sys::CURLMOPT_MAX_PIPELINE_LENGTH, val as c_long)
}
pub fn set_max_concurrent_streams(&mut self, val: usize) -> Result<(), MultiError> {
self.setopt_long(curl_sys::CURLMOPT_MAX_CONCURRENT_STREAMS, val as c_long)
}
fn setopt_long(&mut self, opt: curl_sys::CURLMoption, val: c_long) -> Result<(), MultiError> {
unsafe { cvt(curl_sys::curl_multi_setopt(self.raw.handle, opt, val)) }
}
fn setopt_ptr(
&mut self,
opt: curl_sys::CURLMoption,
val: *const c_char,
) -> Result<(), MultiError> {
unsafe { cvt(curl_sys::curl_multi_setopt(self.raw.handle, opt, val)) }
}
pub fn add(&self, mut easy: Easy) -> Result<EasyHandle, MultiError> {
easy.transfer();
unsafe {
cvt(curl_sys::curl_multi_add_handle(self.raw.handle, easy.raw()))?;
}
Ok(EasyHandle {
guard: DetachGuard {
multi: self.raw.clone(),
easy: easy.raw(),
},
easy,
_marker: marker::PhantomData,
})
}
pub fn add2<H>(&self, easy: Easy2<H>) -> Result<Easy2Handle<H>, MultiError> {
unsafe {
cvt(curl_sys::curl_multi_add_handle(self.raw.handle, easy.raw()))?;
}
Ok(Easy2Handle {
guard: DetachGuard {
multi: self.raw.clone(),
easy: easy.raw(),
},
easy,
_marker: marker::PhantomData,
})
}
pub fn remove(&self, mut easy: EasyHandle) -> Result<Easy, MultiError> {
easy.guard.detach()?;
Ok(easy.easy)
}
pub fn remove2<H>(&self, mut easy: Easy2Handle<H>) -> Result<Easy2<H>, MultiError> {
easy.guard.detach()?;
Ok(easy.easy)
}
pub fn messages<F>(&self, mut f: F)
where
F: FnMut(Message),
{
self._messages(&mut f)
}
fn _messages(&self, f: &mut dyn FnMut(Message)) {
let mut queue = 0;
unsafe {
loop {
let ptr = curl_sys::curl_multi_info_read(self.raw.handle, &mut queue);
if ptr.is_null() {
break;
}
f(Message { ptr, _multi: self })
}
}
}
pub fn action(&self, socket: Socket, events: &Events) -> Result<u32, MultiError> {
let mut remaining = 0;
unsafe {
cvt(curl_sys::curl_multi_socket_action(
self.raw.handle,
socket,
events.bits,
&mut remaining,
))?;
Ok(remaining as u32)
}
}
pub fn timeout(&self) -> Result<u32, MultiError> {
let mut remaining = 0;
unsafe {
cvt(curl_sys::curl_multi_socket_action(
self.raw.handle,
curl_sys::CURL_SOCKET_BAD,
0,
&mut remaining,
))?;
Ok(remaining as u32)
}
}
pub fn get_timeout(&self) -> Result<Option<Duration>, MultiError> {
let mut ms = 0;
unsafe {
cvt(curl_sys::curl_multi_timeout(self.raw.handle, &mut ms))?;
if ms == -1 {
Ok(None)
} else {
Ok(Some(Duration::from_millis(ms as u64)))
}
}
}
pub fn wait(&self, waitfds: &mut [WaitFd], timeout: Duration) -> Result<u32, MultiError> {
let timeout_ms = Multi::timeout_i32(timeout);
unsafe {
let mut ret = 0;
cvt(curl_sys::curl_multi_wait(
self.raw.handle,
waitfds.as_mut_ptr() as *mut _,
waitfds.len() as u32,
timeout_ms,
&mut ret,
))?;
Ok(ret as u32)
}
}
fn timeout_i32(timeout: Duration) -> i32 {
let secs = timeout.as_secs();
if secs > (i32::MAX / 1000) as u64 {
i32::MAX
} else {
secs as i32 * 1000 + timeout.subsec_nanos() as i32 / 1_000_000
}
}
#[cfg(feature = "poll_7_68_0")]
pub fn poll(&self, waitfds: &mut [WaitFd], timeout: Duration) -> Result<u32, MultiError> {
let timeout_ms = Multi::timeout_i32(timeout);
unsafe {
let mut ret = 0;
cvt(curl_sys::curl_multi_poll(
self.raw.handle,
waitfds.as_mut_ptr() as *mut _,
waitfds.len() as u32,
timeout_ms,
&mut ret,
))?;
Ok(ret as u32)
}
}
#[cfg(feature = "poll_7_68_0")]
pub fn waker(&self) -> MultiWaker {
MultiWaker::new(Arc::downgrade(&self.raw))
}
pub fn perform(&self) -> Result<u32, MultiError> {
unsafe {
let mut ret = 0;
cvt(curl_sys::curl_multi_perform(self.raw.handle, &mut ret))?;
Ok(ret as u32)
}
}
pub fn fdset2(
&self,
read: Option<&mut curl_sys::fd_set>,
write: Option<&mut curl_sys::fd_set>,
except: Option<&mut curl_sys::fd_set>,
) -> Result<Option<i32>, MultiError> {
unsafe {
let mut ret = 0;
let read = read.map(|r| r as *mut _).unwrap_or(ptr::null_mut());
let write = write.map(|r| r as *mut _).unwrap_or(ptr::null_mut());
let except = except.map(|r| r as *mut _).unwrap_or(ptr::null_mut());
cvt(curl_sys::curl_multi_fdset(
self.raw.handle,
read,
write,
except,
&mut ret,
))?;
if ret == -1 {
Ok(None)
} else {
Ok(Some(ret))
}
}
}
#[doc(hidden)]
#[deprecated(
since = "0.4.30",
note = "cannot close safely without consuming self; \
will be changed or removed in a future release"
)]
pub fn close(&self) -> Result<(), MultiError> {
Ok(())
}
pub fn raw(&self) -> *mut curl_sys::CURLM {
self.raw.handle
}
}
impl Drop for RawMulti {
fn drop(&mut self) {
unsafe {
let _ = cvt(curl_sys::curl_multi_cleanup(self.handle));
}
}
}
#[cfg(feature = "poll_7_68_0")]
impl MultiWaker {
fn new(raw: std::sync::Weak<RawMulti>) -> Self {
Self { raw }
}
pub fn wakeup(&self) -> Result<(), MultiError> {
if let Some(raw) = self.raw.upgrade() {
unsafe { cvt(curl_sys::curl_multi_wakeup(raw.handle)) }
} else {
Err(MultiError::new(curl_sys::CURLM_BAD_HANDLE))
}
}
}
fn cvt(code: curl_sys::CURLMcode) -> Result<(), MultiError> {
if code == curl_sys::CURLM_OK {
Ok(())
} else {
Err(MultiError::new(code))
}
}
impl fmt::Debug for Multi {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_struct("Multi").field("raw", &self.raw).finish()
}
}
macro_rules! impl_easy_getters {
() => {
impl_easy_getters! {
time_condition_unmet -> bool,
effective_url -> Option<&str>,
effective_url_bytes -> Option<&[u8]>,
response_code -> u32,
http_connectcode -> u32,
filetime -> Option<i64>,
download_size -> f64,
content_length_download -> f64,
total_time -> Duration,
namelookup_time -> Duration,
connect_time -> Duration,
appconnect_time -> Duration,
pretransfer_time -> Duration,
starttransfer_time -> Duration,
redirect_time -> Duration,
redirect_count -> u32,
redirect_url -> Option<&str>,
redirect_url_bytes -> Option<&[u8]>,
header_size -> u64,
request_size -> u64,
content_type -> Option<&str>,
content_type_bytes -> Option<&[u8]>,
os_errno -> i32,
primary_ip -> Option<&str>,
primary_port -> u16,
local_ip -> Option<&str>,
local_port -> u16,
cookies -> List,
}
};
($($name:ident -> $ret:ty,)*) => {
$(
impl_easy_getters!($name, $ret, concat!(
"Same as [`Easy2::",
stringify!($name),
"`](../easy/struct.Easy2.html#method.",
stringify!($name),
")."
));
)*
};
($name:ident, $ret:ty, $doc:expr) => {
#[doc = $doc]
pub fn $name(&mut self) -> Result<$ret, Error> {
self.easy.$name()
}
};
}
impl EasyHandle {
pub fn set_token(&mut self, token: usize) -> Result<(), Error> {
unsafe {
crate::cvt(curl_sys::curl_easy_setopt(
self.easy.raw(),
curl_sys::CURLOPT_PRIVATE,
token,
))
}
}
impl_easy_getters!();
pub fn unpause_read(&self) -> Result<(), Error> {
self.easy.unpause_read()
}
pub fn unpause_write(&self) -> Result<(), Error> {
self.easy.unpause_write()
}
pub fn raw(&self) -> *mut curl_sys::CURL {
self.easy.raw()
}
}
impl fmt::Debug for EasyHandle {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
self.easy.fmt(f)
}
}
impl<H> Easy2Handle<H> {
pub fn get_ref(&self) -> &H {
self.easy.get_ref()
}
pub fn get_mut(&mut self) -> &mut H {
self.easy.get_mut()
}
pub fn set_token(&mut self, token: usize) -> Result<(), Error> {
unsafe {
crate::cvt(curl_sys::curl_easy_setopt(
self.easy.raw(),
curl_sys::CURLOPT_PRIVATE,
token,
))
}
}
impl_easy_getters!();
pub fn unpause_read(&self) -> Result<(), Error> {
self.easy.unpause_read()
}
pub fn unpause_write(&self) -> Result<(), Error> {
self.easy.unpause_write()
}
pub fn raw(&self) -> *mut curl_sys::CURL {
self.easy.raw()
}
}
impl<H: fmt::Debug> fmt::Debug for Easy2Handle<H> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
self.easy.fmt(f)
}
}
impl DetachGuard {
fn detach(&mut self) -> Result<(), MultiError> {
if !self.easy.is_null() {
unsafe {
cvt(curl_sys::curl_multi_remove_handle(
self.multi.handle,
self.easy,
))?
}
self.easy = ptr::null_mut();
}
Ok(())
}
}
impl Drop for DetachGuard {
fn drop(&mut self) {
let _ = self.detach();
}
}
impl<'multi> Message<'multi> {
pub fn result(&self) -> Option<Result<(), Error>> {
unsafe {
if (*self.ptr).msg == curl_sys::CURLMSG_DONE {
Some(crate::cvt((*self.ptr).data as curl_sys::CURLcode))
} else {
None
}
}
}
pub fn result_for(&self, handle: &EasyHandle) -> Option<Result<(), Error>> {
if !self.is_for(handle) {
return None;
}
let mut err = self.result();
if let Some(Err(e)) = &mut err {
if let Some(s) = handle.easy.take_error_buf() {
e.set_extra(s);
}
}
err
}
pub fn result_for2<H>(&self, handle: &Easy2Handle<H>) -> Option<Result<(), Error>> {
if !self.is_for2(handle) {
return None;
}
let mut err = self.result();
if let Some(Err(e)) = &mut err {
if let Some(s) = handle.easy.take_error_buf() {
e.set_extra(s);
}
}
err
}
pub fn is_for(&self, handle: &EasyHandle) -> bool {
unsafe { (*self.ptr).easy_handle == handle.easy.raw() }
}
pub fn is_for2<H>(&self, handle: &Easy2Handle<H>) -> bool {
unsafe { (*self.ptr).easy_handle == handle.easy.raw() }
}
pub fn token(&self) -> Result<usize, Error> {
unsafe {
let mut p = 0usize;
crate::cvt(curl_sys::curl_easy_getinfo(
(*self.ptr).easy_handle,
curl_sys::CURLINFO_PRIVATE,
&mut p,
))?;
Ok(p)
}
}
}
impl<'a> fmt::Debug for Message<'a> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_struct("Message").field("ptr", &self.ptr).finish()
}
}
impl Events {
pub fn new() -> Events {
Events { bits: 0 }
}
pub fn input(&mut self, val: bool) -> &mut Events {
self.flag(curl_sys::CURL_CSELECT_IN, val)
}
pub fn output(&mut self, val: bool) -> &mut Events {
self.flag(curl_sys::CURL_CSELECT_OUT, val)
}
pub fn error(&mut self, val: bool) -> &mut Events {
self.flag(curl_sys::CURL_CSELECT_ERR, val)
}
fn flag(&mut self, flag: c_int, val: bool) -> &mut Events {
if val {
self.bits |= flag;
} else {
self.bits &= !flag;
}
self
}
}
impl fmt::Debug for Events {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_struct("Events")
.field("input", &(self.bits & curl_sys::CURL_CSELECT_IN != 0))
.field("output", &(self.bits & curl_sys::CURL_CSELECT_OUT != 0))
.field("error", &(self.bits & curl_sys::CURL_CSELECT_ERR != 0))
.finish()
}
}
impl SocketEvents {
pub fn input(&self) -> bool {
self.bits & curl_sys::CURL_POLL_IN == curl_sys::CURL_POLL_IN
}
pub fn output(&self) -> bool {
self.bits & curl_sys::CURL_POLL_OUT == curl_sys::CURL_POLL_OUT
}
pub fn input_and_output(&self) -> bool {
self.bits & curl_sys::CURL_POLL_INOUT == curl_sys::CURL_POLL_INOUT
}
pub fn remove(&self) -> bool {
self.bits & curl_sys::CURL_POLL_REMOVE == curl_sys::CURL_POLL_REMOVE
}
}
impl fmt::Debug for SocketEvents {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_struct("Events")
.field("input", &self.input())
.field("output", &self.output())
.field("remove", &self.remove())
.finish()
}
}
impl WaitFd {
pub fn new() -> WaitFd {
WaitFd {
inner: curl_sys::curl_waitfd {
fd: 0,
events: 0,
revents: 0,
},
}
}
pub fn set_fd(&mut self, fd: Socket) {
self.inner.fd = fd;
}
pub fn poll_on_read(&mut self, val: bool) -> &mut WaitFd {
self.flag(curl_sys::CURL_WAIT_POLLIN, val)
}
pub fn poll_on_priority_read(&mut self, val: bool) -> &mut WaitFd {
self.flag(curl_sys::CURL_WAIT_POLLPRI, val)
}
pub fn poll_on_write(&mut self, val: bool) -> &mut WaitFd {
self.flag(curl_sys::CURL_WAIT_POLLOUT, val)
}
fn flag(&mut self, flag: c_short, val: bool) -> &mut WaitFd {
if val {
self.inner.events |= flag;
} else {
self.inner.events &= !flag;
}
self
}
pub fn received_read(&self) -> bool {
self.inner.revents & curl_sys::CURL_WAIT_POLLIN == curl_sys::CURL_WAIT_POLLIN
}
pub fn received_priority_read(&self) -> bool {
self.inner.revents & curl_sys::CURL_WAIT_POLLPRI == curl_sys::CURL_WAIT_POLLPRI
}
pub fn received_write(&self) -> bool {
self.inner.revents & curl_sys::CURL_WAIT_POLLOUT == curl_sys::CURL_WAIT_POLLOUT
}
}
#[cfg(unix)]
impl From<pollfd> for WaitFd {
fn from(pfd: pollfd) -> WaitFd {
let mut events = 0;
if pfd.events & POLLIN == POLLIN {
events |= curl_sys::CURL_WAIT_POLLIN;
}
if pfd.events & POLLPRI == POLLPRI {
events |= curl_sys::CURL_WAIT_POLLPRI;
}
if pfd.events & POLLOUT == POLLOUT {
events |= curl_sys::CURL_WAIT_POLLOUT;
}
WaitFd {
inner: curl_sys::curl_waitfd {
fd: pfd.fd,
events,
revents: 0,
},
}
}
}
impl fmt::Debug for WaitFd {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_struct("WaitFd")
.field("fd", &self.inner.fd)
.field("events", &self.inner.fd)
.field("revents", &self.inner.fd)
.finish()
}
}