use std::{
cell::RefCell,
collections,
fmt,
mem,
os::raw,
ptr::NonNull,
rc::{
Rc,
Weak,
},
result,
time::Duration,
};
use crate::{
as_void_ptr,
ConnectError,
ConnectionEvent,
ConnectionFlags,
Context,
Error,
error::IntoResult,
FFI,
ffi_types::Nullable,
Result,
Stanza,
StreamError,
void_ptr_as,
};
type ConnectionCallback<'cb, 'cx> = dyn FnMut(&Context<'cx, 'cb>, &mut Connection<'cb, 'cx>, ConnectionEvent, i32, Option<StreamError>) + Send + 'cb;
type ConnectionFatHandler<'cb, 'cx> = FatHandler<'cb, 'cx, ConnectionCallback<'cb, 'cx>, ()>;
type Handlers<H> = Vec<Box<H>>;
type TimedCallback<'cb, 'cx> = dyn FnMut(&Context<'cx, 'cb>, &mut Connection<'cb, 'cx>) -> bool + Send + 'cb;
type TimedFatHandler<'cb, 'cx> = FatHandler<'cb, 'cx, TimedCallback<'cb, 'cx>, ()>;
type StanzaCallback<'cb, 'cx> = dyn FnMut(&Context<'cx, 'cb>, &mut Connection<'cb, 'cx>, &Stanza) -> bool + Send + 'cb;
type StanzaFatHandler<'cb, 'cx> = FatHandler<'cb, 'cx, StanzaCallback<'cb, 'cx>, Option<String>>;
struct FatHandlers<'cb, 'cx> {
connection: Option<ConnectionFatHandler<'cb, 'cx>>,
timed: Handlers<TimedFatHandler<'cb, 'cx>>,
stanza: Handlers<StanzaFatHandler<'cb, 'cx>>,
}
impl fmt::Debug for FatHandlers<'_, '_> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
let mut s = f.debug_struct("FatHandlers");
s.field("connection", &if self.connection.is_some() { "set" } else { "unset" });
s.field("timed", &format!("{} handlers", self.timed.len()));
s.field("stanza", &format!("{} handlers", self.stanza.len()));
s.finish()
}
}
#[derive(Debug)]
pub struct Connection<'cb, 'cx> {
inner: NonNull<sys::xmpp_conn_t>,
ctx: Option<Context<'cx, 'cb>>,
owned: bool,
fat_handlers: Rc<RefCell<FatHandlers<'cb, 'cx>>>,
}
impl<'cb, 'cx> Connection<'cb, 'cx> {
pub fn new(ctx: Context<'cx, 'cb>) -> Self {
unsafe {
Self::from_inner(sys::xmpp_conn_new(ctx.as_inner()), ctx, Rc::new(RefCell::new(FatHandlers {
connection: None,
timed: Vec::with_capacity(4),
stanza: Vec::with_capacity(4),
})))
}
}
#[inline]
unsafe fn with_inner(inner: *mut sys::xmpp_conn_t, ctx: Context<'cx, 'cb>, owned: bool, handlers: Rc<RefCell<FatHandlers<'cb, 'cx>>>) -> Self {
Connection { inner: NonNull::new(inner).expect("Cannot allocate memory for Connection"), ctx: Some(ctx), owned, fat_handlers: handlers }
}
unsafe fn from_inner(inner: *mut sys::xmpp_conn_t, ctx: Context<'cx, 'cb>, handlers: Rc<RefCell<FatHandlers<'cb, 'cx>>>) -> Self {
Self::with_inner(
inner,
ctx,
true,
handlers,
)
}
unsafe fn from_inner_ref_mut(inner: *mut sys::xmpp_conn_t, handlers: Rc<RefCell<FatHandlers<'cb, 'cx>>>) -> Self {
let ctx = Context::from_inner_ref(sys::xmpp_conn_get_context(inner));
Self::with_inner(inner, ctx, false, handlers)
}
extern "C" fn connection_handler_cb<CB>(conn: *mut sys::xmpp_conn_t, event: sys::xmpp_conn_event_t, error: raw::c_int,
stream_error: *mut sys::xmpp_stream_error_t, userdata: *mut raw::c_void) {
let connection_handler = unsafe { void_ptr_as::<ConnectionFatHandler>(userdata) };
if let Some(fat_handlers) = connection_handler.fat_handlers.upgrade() {
let mut conn = unsafe { Self::from_inner_ref_mut(conn, fat_handlers) };
let stream_error: Option<StreamError> = unsafe { stream_error.as_ref() }.map(|e| e.into());
(connection_handler.handler)(conn.context_detached(), &mut conn, event, error, stream_error);
}
}
extern "C" fn timed_handler_cb<CB>(conn: *mut sys::xmpp_conn_t, userdata: *mut raw::c_void) -> i32 {
let timed_handler = unsafe { void_ptr_as::<TimedFatHandler>(userdata) };
if let Some(fat_handlers) = timed_handler.fat_handlers.upgrade() {
let mut conn = unsafe { Self::from_inner_ref_mut(conn, fat_handlers) };
let res = (timed_handler.handler)(conn.context_detached(), &mut conn);
if !res {
Self::drop_fat_handler(&mut conn.fat_handlers.borrow_mut().timed, timed_handler);
}
res as _
} else {
0
}
}
extern "C" fn handler_cb<CB>(conn: *mut sys::xmpp_conn_t, stanza: *mut sys::xmpp_stanza_t, userdata: *mut raw::c_void) -> i32 {
let stanza_handler = unsafe { void_ptr_as::<StanzaFatHandler>(userdata) };
if let Some(fat_handlers) = stanza_handler.fat_handlers.upgrade() {
let mut conn = unsafe { Self::from_inner_ref_mut(conn, fat_handlers) };
let stanza = unsafe { Stanza::from_inner_ref(stanza) };
let res = (stanza_handler.handler)(conn.context_detached(), &mut conn, &stanza);
if !res {
Self::drop_fat_handler(&mut conn.fat_handlers.borrow_mut().stanza, stanza_handler);
}
res as _
} else {
0
}
}
fn store_fat_handler<CB: ?Sized, T>(fat_handlers: &mut Handlers<FatHandler<'cb, 'cx, CB, T>>, fat_handler: FatHandler<'cb, 'cx, CB, T>) -> Option<*const FatHandler<'cb, 'cx, CB, T>> {
if Self::get_fat_handler_pos_by_callback(fat_handlers, fat_handler.cb_addr).is_none() {
let handler = Box::new(fat_handler);
let out = &*handler as _;
fat_handlers.push(handler);
Some(out)
} else {
None
}
}
fn get_fat_handler_pos<CB: ?Sized, T>(fat_handlers: &Handlers<FatHandler<'cb, 'cx, CB, T>>, fat_handler_ptr: *const FatHandler<'cb, 'cx, CB, T>) -> Option<usize> {
#![allow(clippy::ptr_arg)]
fat_handlers.iter().position(|x| fat_handler_ptr == x.as_ref())
}
fn get_fat_handler_pos_by_callback<CB: ?Sized, T>(fat_handlers: &Handlers<FatHandler<'cb, 'cx, CB, T>>, cb_addr: *const ()) -> Option<usize> {
#![allow(clippy::ptr_arg)]
fat_handlers.iter().position(|x| cb_addr == x.cb_addr)
}
fn validate_fat_handler<'f, CB: ?Sized, T>(fat_handlers: &'f Handlers<FatHandler<'cb, 'cx, CB, T>>, fat_handler_ptr: *const FatHandler<'cb, 'cx, CB, T>) -> Option<&'f FatHandler<'cb, 'cx, CB, T>> {
#![allow(clippy::ptr_arg)]
Self::get_fat_handler_pos(fat_handlers, fat_handler_ptr).map(|pos| {
fat_handlers[pos].as_ref()
})
}
fn drop_fat_handler<CB: ?Sized, T>(fat_handlers: &mut Handlers<FatHandler<'cb, 'cx, CB, T>>, fat_handler_ptr: *const FatHandler<'cb, 'cx, CB, T>) -> Option<usize> {
if let Some(pos) = Self::get_fat_handler_pos(fat_handlers, fat_handler_ptr) {
fat_handlers.remove(pos);
Some(pos)
} else {
None
}
}
fn make_fat_handler<CB: ?Sized, T>(&self, handler: Box<CB>, cb_addr: *const (), extra: T) -> FatHandler<'cb, 'cx, CB, T> {
FatHandler {
fat_handlers: Rc::downgrade(&self.fat_handlers),
handler,
cb_addr,
extra
}
}
fn context_detached<'a>(&self) -> &'a Context<'cx, 'cb> {
unsafe { (self.ctx.as_ref().unwrap() as *const Context).as_ref() }.unwrap()
}
pub fn flags(&self) -> ConnectionFlags {
ConnectionFlags::from_bits(unsafe { sys::xmpp_conn_get_flags(self.inner.as_ptr()) }).unwrap()
}
pub fn set_flags(&mut self, flags: ConnectionFlags) -> Result<()> {
unsafe {
sys::xmpp_conn_set_flags(self.inner.as_mut(), flags.bits())
}.into_result()
}
pub fn jid(&self) -> Option<&str> {
unsafe {
FFI(sys::xmpp_conn_get_jid(self.inner.as_ptr())).receive()
}
}
pub fn bound_jid(&self) -> Option<&str> {
unsafe {
FFI(sys::xmpp_conn_get_bound_jid(self.inner.as_ptr())).receive()
}
}
pub fn set_jid(&mut self, jid: impl AsRef<str>) {
let jid = FFI(jid.as_ref()).send();
unsafe {
sys::xmpp_conn_set_jid(self.inner.as_mut(), jid.as_ptr())
}
}
pub fn pass(&self) -> Option<&str> {
unsafe {
FFI(sys::xmpp_conn_get_pass(self.inner.as_ptr())).receive()
}
}
pub fn set_pass(&mut self, pass: impl AsRef<str>) {
let pass = FFI(pass.as_ref()).send();
unsafe {
sys::xmpp_conn_set_pass(self.inner.as_mut(), pass.as_ptr())
}
}
pub fn disable_tls(&mut self) {
unsafe {
sys::xmpp_conn_disable_tls(self.inner.as_mut())
}
}
pub fn is_secured(&self) -> bool {
unsafe {
FFI(sys::xmpp_conn_is_secured(self.inner.as_ptr())).receive_bool()
}
}
pub fn set_keepalive(&mut self, timeout: Duration, interval: Duration) {
unsafe {
sys::xmpp_conn_set_keepalive(self.inner.as_mut(), timeout.as_secs() as _, interval.as_secs() as _)
}
}
pub fn connect_client<CB>(self, alt_host: Option<&str>, alt_port: impl Into<Option<u16>>, handler: CB) -> Result<Context<'cx, 'cb>, ConnectError<'cb, 'cx>>
where
CB: FnMut(&Context<'cx, 'cb>, &mut Connection<'cb, 'cx>, ConnectionEvent, i32, Option<StreamError>) + Send + 'cb,
{
let mut me = self;
let alt_host = FFI(alt_host).send();
let alt_port: Nullable<_> = alt_port.into().into();
if me.jid().is_none() {
return Err(ConnectError {
conn: me,
error: Error::InvalidOperation,
});
}
let callback = Self::connection_handler_cb::<CB>;
let new_handler = Some(me.make_fat_handler(Box::new(handler) as _, callback as _, ()));
let old_handler = mem::replace(&mut me.fat_handlers.borrow_mut().connection, new_handler);
let out = unsafe {
sys::xmpp_connect_client(
me.inner.as_mut(),
alt_host.as_ptr(),
alt_port.val(),
Some(callback),
as_void_ptr(me.fat_handlers.borrow().connection.as_ref().unwrap()),
)
}.into_result();
match out {
Ok(_) => {
let mut out = me.ctx.take().expect("Internal context is empty, it must never happen");
out.consume_connection(me);
Ok(out)
},
Err(e) => {
me.fat_handlers.borrow_mut().connection = old_handler;
Err(ConnectError {
conn: me,
error: e,
})
}
}
}
pub fn connect_component<CB>(self, host: impl AsRef<str>, port: impl Into<Option<u16>>, handler: CB) -> Result<Context<'cx, 'cb>, ConnectError<'cb, 'cx>>
where
CB: FnMut(&Context<'cx, 'cb>, &mut Connection<'cb, 'cx>, ConnectionEvent, i32, Option<StreamError>) + Send + 'cb,
{
let mut me = self;
let host = FFI(host.as_ref()).send();
let port: Nullable<_> = port.into().into();
let callback = Self::connection_handler_cb::<CB>;
let new_handler = Some(me.make_fat_handler(Box::new(handler) as _, callback as _, ()));
let old_handler = mem::replace(&mut me.fat_handlers.borrow_mut().connection, new_handler);
let out = unsafe {
sys::xmpp_connect_component(
me.inner.as_mut(),
host.as_ptr(),
port.val(),
Some(callback),
as_void_ptr(&me.fat_handlers.borrow().connection),
)
}.into_result();
match out {
Ok(_) => {
let mut out = me.ctx.take().expect("Internal context is empty, it must never happen");
out.consume_connection(me);
Ok(out)
},
Err(e) => {
me.fat_handlers.borrow_mut().connection = old_handler;
Err(ConnectError {
conn: me,
error: e,
})
}
}
}
pub fn connect_raw<CB>(self, alt_host: Option<&str>, alt_port: impl Into<Option<u16>>, handler: CB) -> Result<Context<'cx, 'cb>, ConnectError<'cb, 'cx>>
where
CB: FnMut(&Context<'cx, 'cb>, &mut Connection<'cb, 'cx>, ConnectionEvent, i32, Option<StreamError>) + Send + 'cb,
{
let mut me = self;
let alt_host = FFI(alt_host).send();
let alt_port: Nullable<_> = alt_port.into().into();
if me.jid().is_none() {
return Err(ConnectError {
conn: me,
error: Error::InvalidOperation,
});
}
let callback = Self::connection_handler_cb::<CB>;
let new_handler = Some(me.make_fat_handler(Box::new(handler) as _, callback as _, ()));
let old_handler = mem::replace(&mut me.fat_handlers.borrow_mut().connection, new_handler);
let out = unsafe {
sys::xmpp_connect_raw(
me.inner.as_mut(),
alt_host.as_ptr(),
alt_port.val(),
Some(callback),
as_void_ptr(me.fat_handlers.borrow().connection.as_ref().unwrap()),
)
}.into_result();
match out {
Ok(_) => {
let mut out = me.ctx.take().expect("Internal context is empty, it must never happen");
out.consume_connection(me);
Ok(out)
},
Err(e) => {
me.fat_handlers.borrow_mut().connection = old_handler;
Err(ConnectError {
conn: me,
error: e,
})
}
}
}
pub fn open_stream_default(&self) -> Result<()> {
unsafe {
sys::xmpp_conn_open_stream_default(self.inner.as_ptr())
}.into_result()
}
pub fn open_stream(&self, attributes: &collections::HashMap<&str, &str>) -> Result<()> {
let mut storage = Vec::with_capacity(attributes.len() * 2);
let mut attrs = Vec::with_capacity(attributes.len() * 2);
for attr in attributes {
storage.push(FFI(*attr.0).send());
storage.push(FFI(*attr.1).send());
attrs.push(storage[storage.len() - 2].as_ptr() as _);
attrs.push(storage[storage.len() - 1].as_ptr() as _);
}
unsafe {
sys::xmpp_conn_open_stream(
self.inner.as_ptr(),
attrs.as_mut_ptr(),
attrs.len(),
)
}.into_result()
}
pub fn tls_start(&self) -> Result<()> {
unsafe {
sys::xmpp_conn_tls_start(self.inner.as_ptr())
}.into_result()
}
pub fn disconnect(&mut self) {
unsafe {
sys::xmpp_disconnect(self.inner.as_mut())
}
}
pub fn send_raw_string(&mut self, data: impl AsRef<str>) {
let data = FFI(data.as_ref()).send();
unsafe {
sys::xmpp_send_raw_string(self.inner.as_mut(), data.as_ptr());
}
}
pub fn send_raw(&mut self, data: impl AsRef<[u8]>) {
let data = data.as_ref();
unsafe {
sys::xmpp_send_raw(self.inner.as_mut(), data.as_ptr() as _, data.len());
}
}
pub fn send(&mut self, stanza: &Stanza) { unsafe { sys::xmpp_send(self.inner.as_mut(), stanza.as_inner()) } }
pub fn timed_handler_add<CB>(&mut self, handler: CB, period: Duration) -> Option<TimedHandlerId<'cb, 'cx, CB>>
where
CB: FnMut(&Context<'cx, 'cb>, &mut Connection<'cb, 'cx>) -> bool + Send + 'cb,
{
let callback = Self::timed_handler_cb::<CB>;
let handler = self.make_fat_handler(Box::new(handler) as _, callback as _, ());
Self::store_fat_handler(&mut self.fat_handlers.borrow_mut().timed, handler).map(|fat_handler_ptr| {
unsafe {
sys::xmpp_timed_handler_add(
self.inner.as_ptr(),
Some(callback),
period.as_millis() as raw::c_ulong,
fat_handler_ptr as _,
);
}
TimedHandlerId(fat_handler_ptr as _)
})
}
pub fn timed_handler_delete<CB>(&mut self, handler_id: TimedHandlerId<CB>) {
#![allow(clippy::needless_pass_by_value)]
unsafe {
sys::xmpp_timed_handler_delete(self.inner.as_mut(), Some(Self::timed_handler_cb::<CB>))
}
Self::drop_fat_handler(&mut self.fat_handlers.borrow_mut().timed, handler_id.0 as _);
}
pub fn timed_handlers_clear(&mut self) {
for handler in self.fat_handlers.borrow_mut().timed.drain(..) {
unsafe { sys::xmpp_timed_handler_delete(self.inner.as_mut(), Some(mem::transmute(handler.cb_addr))) };
}
self.fat_handlers.borrow_mut().timed.shrink_to_fit();
}
pub fn id_handler_add<CB>(&mut self, handler: CB, id: impl Into<String>) -> Option<IdHandlerId<'cb, 'cx, CB>>
where
CB: FnMut(&Context<'cx, 'cb>, &mut Connection<'cb, 'cx>, &Stanza) -> bool + Send + 'cb,
{
let id = id.into();
let ffi_id = FFI(id.as_str()).send();
let callback = Self::handler_cb::<CB>;
let handler = self.make_fat_handler(
Box::new(handler) as _,
callback as _,
Some(id),
);
Self::store_fat_handler(&mut self.fat_handlers.borrow_mut().stanza, handler).map(|fat_handler_ptr| {
unsafe {
sys::xmpp_id_handler_add(
self.inner.as_ptr(),
Some(callback),
ffi_id.as_ptr(),
fat_handler_ptr as _,
);
}
IdHandlerId(fat_handler_ptr as _)
})
}
pub fn id_handler_delete<CB>(&mut self, handler_id: IdHandlerId<CB>) {
#![allow(clippy::needless_pass_by_value)]
if let Some(fat_handler) = Self::validate_fat_handler(&self.fat_handlers.borrow().stanza, handler_id.0 as _) {
let id = FFI(fat_handler.extra.as_ref().unwrap().as_str()).send();
unsafe {
sys::xmpp_id_handler_delete(self.inner.as_mut(), Some(Self::handler_cb::<CB>), id.as_ptr())
}
}
Self::drop_fat_handler(&mut self.fat_handlers.borrow_mut().stanza, handler_id.0 as _);
}
pub fn id_handlers_clear(&mut self) {
self.fat_handlers.borrow_mut().stanza.retain(|x| {
if let Some(ref id) = x.extra {
unsafe { sys::xmpp_id_handler_delete(self.inner.as_ptr(), Some(mem::transmute(x.cb_addr)), FFI(id.as_str()).send().as_ptr()) };
false
} else {
true
}
});
self.fat_handlers.borrow_mut().stanza.shrink_to_fit();
}
pub fn handler_add<CB>(&mut self, handler: CB, ns: Option<&str>, name: Option<&str>, typ: Option<&str>) -> Option<HandlerId<'cb, 'cx, CB>>
where
CB: FnMut(&Context<'cx, 'cb>, &mut Connection<'cb, 'cx>, &Stanza) -> bool + Send + 'cb,
{
let ns = FFI(ns).send();
let name = FFI(name).send();
let typ = FFI(typ).send();
let callback = Self::handler_cb::<CB>;
let handler = self.make_fat_handler(Box::new(handler) as _, callback as _, None);
Self::store_fat_handler(&mut self.fat_handlers.borrow_mut().stanza, handler).map(|fat_handler_ptr| {
unsafe {
sys::xmpp_handler_add(
self.inner.as_ptr(),
Some(callback),
ns.as_ptr(),
name.as_ptr(),
typ.as_ptr(),
fat_handler_ptr as _,
)
}
HandlerId(fat_handler_ptr as _)
})
}
pub fn handler_delete<CB>(&mut self, handler_id: HandlerId<CB>) {
#![allow(clippy::needless_pass_by_value)]
unsafe {
sys::xmpp_handler_delete(self.inner.as_mut(), Some(Self::handler_cb::<CB>))
}
Self::drop_fat_handler(&mut self.fat_handlers.borrow_mut().stanza, handler_id.0 as _);
}
pub fn handlers_clear(&mut self) {
self.fat_handlers.borrow_mut().stanza.retain(|x| {
if x.extra.is_none() {
unsafe { sys::xmpp_handler_delete(self.inner.as_ptr(), Some(mem::transmute(x.cb_addr))) };
false
} else {
true
}
});
self.fat_handlers.borrow_mut().stanza.shrink_to_fit();
}
}
impl PartialEq for Connection<'_, '_> {
fn eq(&self, other: &Connection) -> bool {
self.inner == other.inner
}
}
impl Eq for Connection<'_, '_> {}
impl Drop for Connection<'_, '_> {
fn drop(&mut self) {
if self.owned {
unsafe {
sys::xmpp_conn_release(self.inner.as_mut());
}
}
}
}
unsafe impl Send for Connection<'_, '_> {}
pub struct HandlerId<'cb, 'cx, CB>(*const FatHandler<'cb, 'cx, CB, ()>);
impl<CB> fmt::Debug for HandlerId<'_, '_, CB> {
fn fmt(&self, f: &mut fmt::Formatter) -> result::Result<(), fmt::Error> {
write!(f, "{:?}", self.0)
}
}
pub struct TimedHandlerId<'cb, 'cx, CB>(*const FatHandler<'cb, 'cx, CB, ()>);
impl<CB> fmt::Debug for TimedHandlerId<'_, '_, CB> {
fn fmt(&self, f: &mut fmt::Formatter) -> result::Result<(), fmt::Error> {
write!(f, "{:?}", self.0)
}
}
pub struct IdHandlerId<'cb, 'cx, CB>(*const FatHandler<'cb, 'cx, CB, Option<String>>);
impl<CB> fmt::Debug for IdHandlerId<'_, '_, CB> {
fn fmt(&self, f: &mut fmt::Formatter) -> result::Result<(), fmt::Error> {
write!(f, "{:?}", self.0)
}
}
pub struct FatHandler<'cb, 'cx, CB: ?Sized, T> {
fat_handlers: Weak<RefCell<FatHandlers<'cb, 'cx>>>,
handler: Box<CB>,
cb_addr: *const (),
extra: T,
}