use openiap_client::protos::{
AggregateRequest, CountRequest, DistinctRequest, DownloadRequest, Envelope, InsertOneRequest,
QueryRequest, SigninRequest, UploadRequest, WatchEvent, WatchRequest,
};
use openiap_client::{Client, QueueEvent, RegisterExchangeRequest, RegisterQueueRequest};
use std::collections::{HashMap, VecDeque};
use std::ffi::CStr;
use std::ffi::CString;
use std::os::raw::c_char;
use std::sync::Mutex;
use tokio::runtime::Runtime;
use tracing::debug;
use lazy_static::lazy_static;
lazy_static! {
static ref WATCH_EVENTS: std::sync::Mutex<HashMap<String, VecDeque<WatchEvent>>> = {
let m = HashMap::new();
Mutex::new(m)
};
static ref QUEUE_EVENTS: std::sync::Mutex<HashMap<String, VecDeque<QueueEvent>>> = {
let m = HashMap::new();
Mutex::new(m)
};
}
#[allow(dead_code)]
#[repr(C)]
pub struct ClientWrapper {
success: bool,
error: *const c_char,
client: Option<Client>,
runtime: std::sync::Arc<Runtime>,
}
#[repr(C)]
#[derive(Debug, Clone)]
pub struct WatchEventWrapper {
id: *const c_char,
operation: *const c_char,
document: *const c_char,
}
impl Default for WatchEventWrapper {
fn default() -> Self {
WatchEventWrapper {
id: std::ptr::null(),
operation: std::ptr::null(),
document: std::ptr::null()
}
}
}
#[repr(C)]
pub struct QueryRequestWrapper {
collectionname: *const c_char,
query: *const c_char,
projection: *const c_char,
orderby: *const c_char,
queryas: *const c_char,
explain: bool,
skip: i32,
top: i32,
}
#[repr(C)]
pub struct QueryResponseWrapper {
success: bool,
results: *const c_char,
error: *const c_char,
}
#[no_mangle]
#[tracing::instrument(skip_all)]
pub extern "C" fn query(
client: *mut ClientWrapper,
options: *mut QueryRequestWrapper,
) -> *mut QueryResponseWrapper {
let options = unsafe { &*options };
let client_wrapper = unsafe { &mut *client };
let client = &client_wrapper.client;
let runtime = &client_wrapper.runtime;
let request = QueryRequest {
collectionname: unsafe { CStr::from_ptr(options.collectionname).to_str().unwrap() }
.to_string(),
query: unsafe { CStr::from_ptr(options.query).to_str().unwrap() }.to_string(),
projection: unsafe { CStr::from_ptr(options.projection).to_str().unwrap() }.to_string(),
orderby: unsafe { CStr::from_ptr(options.orderby).to_str().unwrap() }.to_string(),
queryas: unsafe { CStr::from_ptr(options.queryas).to_str().unwrap() }.to_string(),
explain: options.explain,
skip: options.skip,
top: options.top,
..Default::default()
};
if client.is_none() {
let error_msg = CString::new("Client is not connected").unwrap().into_raw();
let response = QueryResponseWrapper {
success: false,
results: std::ptr::null(),
error: error_msg,
};
return Box::into_raw(Box::new(response));
}
let result = runtime.block_on(async {
client.as_ref().unwrap().query(request).await
});
let response = match result {
Ok(data) => {
let results: *const c_char = CString::new(data.results).unwrap().into_raw();
QueryResponseWrapper {
success: true,
results: results,
error: std::ptr::null(),
}
}
Err(e) => {
let error_msg = CString::new(format!("Query failed: {:?}", e))
.unwrap()
.into_raw();
QueryResponseWrapper {
success: false,
results: std::ptr::null(),
error: error_msg,
}
}
};
Box::into_raw(Box::new(response))
}
type QueryCallback = extern "C" fn(wrapper: *mut QueryResponseWrapper);
#[no_mangle]
#[tracing::instrument(skip_all)]
pub extern "C" fn query_async(
client: *mut ClientWrapper,
options: *mut QueryRequestWrapper,
callback: QueryCallback,
) {
debug!("Rust: query_async");
let options = unsafe { &*options };
let client_wrapper = unsafe { &mut *client };
let client = &client_wrapper.client;
let runtime = &client_wrapper.runtime;
let request = QueryRequest {
collectionname: unsafe { CStr::from_ptr(options.collectionname).to_str().unwrap() }
.to_string(),
query: unsafe { CStr::from_ptr(options.query).to_str().unwrap() }.to_string(),
projection: unsafe { CStr::from_ptr(options.projection).to_str().unwrap() }.to_string(),
orderby: unsafe { CStr::from_ptr(options.orderby).to_str().unwrap() }.to_string(),
queryas: unsafe { CStr::from_ptr(options.queryas).to_str().unwrap() }.to_string(),
explain: options.explain,
skip: options.skip,
top: options.top,
..Default::default()
};
if client.is_none() {
let error_msg = CString::new("Client is not connected").unwrap().into_raw();
let response = QueryResponseWrapper {
success: false,
results: std::ptr::null(),
error: error_msg,
};
return callback(Box::into_raw(Box::new(response)));
}
debug!("Rust: runtime.spawn");
runtime.spawn(async move {
debug!("Rust: client.query");
let result = client.as_ref().unwrap().query(request).await;
let response = match result {
Ok(data) => {
let results: *const c_char = CString::new(data.results).unwrap().into_raw();
QueryResponseWrapper {
success: true,
results: results,
error: std::ptr::null(),
}
}
Err(e) => {
let error_msg = CString::new(format!("Query failed: {:?}", e))
.unwrap()
.into_raw();
QueryResponseWrapper {
success: false,
results: std::ptr::null(),
error: error_msg,
}
}
};
debug!("Rust: callback response");
callback(Box::into_raw(Box::new(response)));
});
}
#[no_mangle]
#[tracing::instrument(skip_all)]
pub extern "C" fn free_query_response(response: *mut QueryResponseWrapper) {
if response.is_null() {
return;
}
unsafe {
let _ = Box::from_raw(response);
}
}
#[repr(C)]
pub struct AggregateRequestWrapper {
collectionname: *const c_char,
aggregates: *const c_char,
queryas: *const c_char,
hint: *const c_char,
explain: bool,
}
use tracing_subscriber::{filter::EnvFilter, fmt, prelude::*};
#[no_mangle]
#[tracing::instrument(skip_all)]
pub extern "C" fn enable_tracing(rust_log: *const c_char, tracing: *const c_char) {
let rust_log = unsafe { CStr::from_ptr(rust_log).to_str().unwrap() };
let rust_log = rust_log.to_string();
let mut filter = EnvFilter::from_default_env();
if !rust_log.is_empty() {
filter = EnvFilter::new(rust_log);
}
let mut subscriber = fmt::layer();
let tracing = unsafe { CStr::from_ptr(tracing).to_str().unwrap() };
let tracing = tracing.to_string();
if !tracing.is_empty() {
subscriber = match tracing.to_lowercase().as_str() {
"new" => subscriber.with_span_events(tracing_subscriber::fmt::format::FmtSpan::NEW),
"enter" => subscriber.with_span_events(tracing_subscriber::fmt::format::FmtSpan::ENTER),
"exit" => subscriber.with_span_events(tracing_subscriber::fmt::format::FmtSpan::EXIT),
"close" => subscriber.with_span_events(tracing_subscriber::fmt::format::FmtSpan::CLOSE),
"none" => subscriber.with_span_events(tracing_subscriber::fmt::format::FmtSpan::NONE),
"active" => {
subscriber.with_span_events(tracing_subscriber::fmt::format::FmtSpan::ACTIVE)
}
"full" => subscriber.with_span_events(tracing_subscriber::fmt::format::FmtSpan::FULL),
_ => subscriber,
}
}
let subscriber = subscriber
.and_then(filter)
.with_subscriber(tracing_subscriber::registry());
match tracing::subscriber::set_global_default(subscriber) {
Ok(()) => {
debug!("Tracing enabled");
}
Err(e) => {
eprintln!("Tracing failed: {:?}", e);
}
}
}
#[no_mangle]
#[tracing::instrument(skip_all)]
pub extern "C" fn disable_tracing() {
}
#[no_mangle]
pub extern "C" fn connect(server_address: *const c_char) -> *mut ClientWrapper {
let server_address = unsafe { CStr::from_ptr(server_address).to_str().unwrap() };
let runtime = std::sync::Arc::new(Runtime::new().unwrap());
let client = runtime.block_on(Client::connect(server_address));
if client.is_err() == true {
let e = client.err().unwrap();
let error_msg = CString::new(format!("Connaction failed: {:?}", e))
.unwrap()
.into_raw();
return Box::into_raw(Box::new(ClientWrapper {
client: None,
runtime,
success: false,
error: error_msg,
}));
}
Box::into_raw(Box::new(ClientWrapper {
client: Some(client.unwrap()),
runtime,
success: true,
error: std::ptr::null(),
}))
}
type ConnectCallback = extern "C" fn(wrapper: *mut ClientWrapper);
#[no_mangle]
#[tracing::instrument(skip_all)]
pub extern "C" fn connect_async(server_address: *const c_char, callback: ConnectCallback) {
debug!("rust::connect_async");
let server_address = unsafe { CStr::from_ptr(server_address).to_str().unwrap().to_string() };
let runtime = std::sync::Arc::new(Runtime::new().unwrap());
debug!("rust::Spawn the async task");
let runtime_clone = std::sync::Arc::clone(&runtime);
runtime.spawn(async move {
debug!("rust::Simulated async task started");
let client_result = Client::connect(&server_address).await;
debug!("rust::Client::connect::done");
let wrapper = if let Ok(client) = client_result {
Box::into_raw(Box::new(ClientWrapper {
client: Some(client),
runtime: runtime_clone,
success: true,
error: std::ptr::null(),
}))
} else {
let e = client_result.err().unwrap();
let error_msg = CString::new(format!("Connection failed: {:?}", e))
.unwrap()
.into_raw();
Box::into_raw(Box::new(ClientWrapper {
client: None,
runtime: runtime_clone,
success: false,
error: error_msg,
}))
};
debug!("rust::Client::Calling callback with result");
callback(wrapper);
});
std::thread::sleep(std::time::Duration::from_secs(2));
}
#[allow(dead_code)]
#[no_mangle]
#[tracing::instrument(skip_all)]
pub extern "C" fn free_client(response: *mut ClientWrapper) {
if response.is_null() {
debug!("free_client: response is null");
return;
}
unsafe {
let response_ref: &ClientWrapper = &*response;
if !response_ref.error.is_null() {
let error_cstr = CStr::from_ptr(response_ref.error);
if let Ok(error_str) = error_cstr.to_str() {
debug!("free_client: error = {}", error_str);
} else {
debug!("free_client: error = <invalid UTF-8>");
}
}
if let Some(client) = &response_ref.client {
let runtime = &response_ref.runtime;
runtime.block_on(async move {
{
let inner = client.inner.lock().await;
let mut queries = inner.queries.lock().await;
for (id, response_tx) in queries.drain() {
debug!("free_client: canceling request with id: {:?}", id);
let _ = response_tx.send(Envelope {
command: "cancelled".to_string(),
..Default::default()
});
}
} {
let inner = client.inner.lock().await;
let mut streams = inner.streams.lock().await;
let stream_keys = streams.keys().cloned().collect::<Vec<String>>();
stream_keys.iter().for_each(|k| {
debug!("free_client: client inner state: stream: {:?}", k);
streams.remove(k.clone().as_str());
});
} });
}
}
debug!("free_client::complete");
}
#[repr(C)]
pub struct SigninRequestWrapper {
username: *const c_char,
password: *const c_char,
jwt: *const c_char,
agent: *const c_char,
version: *const c_char,
longtoken: bool,
validateonly: bool,
ping: bool,
}
#[repr(C)]
pub struct SigninResponseWrapper {
success: bool,
jwt: *const c_char,
error: *const c_char,
}
#[no_mangle]
#[tracing::instrument(skip_all)]
pub extern "C" fn signin(
client: *mut ClientWrapper,
options: *mut SigninRequestWrapper,
) -> *mut SigninResponseWrapper {
let options = unsafe { &*options };
let client_wrapper = unsafe { &mut *client };
let client = &client_wrapper.client;
let runtime = &client_wrapper.runtime;
let request = SigninRequest {
username: unsafe { CStr::from_ptr(options.username).to_str().unwrap() }.to_string(),
password: unsafe { CStr::from_ptr(options.password).to_str().unwrap() }.to_string(),
jwt: unsafe { CStr::from_ptr(options.jwt).to_str().unwrap() }.to_string(),
agent: unsafe { CStr::from_ptr(options.agent).to_str().unwrap() }.to_string(),
version: unsafe { CStr::from_ptr(options.version).to_str().unwrap() }.to_string(),
longtoken: options.longtoken,
ping: options.ping,
validateonly: options.validateonly,
..Default::default()
};
if client.is_none() {
let error_msg = CString::new("Client is not connected").unwrap().into_raw();
let response = SigninResponseWrapper {
success: false,
jwt: std::ptr::null(),
error: error_msg,
};
return Box::into_raw(Box::new(response));
}
let result = runtime.block_on(async {
client.as_ref().unwrap().signin(request).await
});
let response = match result {
Ok(data) => {
let jwt = CString::new(data.jwt).unwrap().into_raw();
SigninResponseWrapper {
success: true,
jwt,
error: std::ptr::null(),
}
}
Err(e) => {
let error_msg = CString::new(format!("Signin failed: {:?}", e))
.unwrap()
.into_raw();
SigninResponseWrapper {
success: false,
jwt: std::ptr::null(),
error: error_msg,
}
}
};
Box::into_raw(Box::new(response))
}
type SigninCallback = extern "C" fn(wrapper: *mut SigninResponseWrapper);
#[no_mangle]
#[tracing::instrument(skip_all)]
pub extern "C" fn signin_async(
client: *mut ClientWrapper,
options: *mut SigninRequestWrapper,
callback: SigninCallback,
) {
let options = unsafe { &*options };
let client_wrapper = unsafe { &mut *client };
let client = &client_wrapper.client;
let runtime = &client_wrapper.runtime;
let request = SigninRequest {
username: unsafe { CStr::from_ptr(options.username).to_str().unwrap() }.to_string(),
password: unsafe { CStr::from_ptr(options.password).to_str().unwrap() }.to_string(),
jwt: unsafe { CStr::from_ptr(options.jwt).to_str().unwrap() }.to_string(),
agent: unsafe { CStr::from_ptr(options.agent).to_str().unwrap() }.to_string(),
version: unsafe { CStr::from_ptr(options.version).to_str().unwrap() }.to_string(),
longtoken: options.longtoken,
ping: options.ping,
validateonly: options.validateonly,
..Default::default()
};
if client.is_none() {
let error_msg = CString::new("Client is not connected").unwrap().into_raw();
let response = SigninResponseWrapper {
success: false,
jwt: std::ptr::null(),
error: error_msg,
};
return callback(Box::into_raw(Box::new(response)));
}
runtime.spawn(async move {
let result = client.as_ref().unwrap().signin(request).await;
let response = match result {
Ok(data) => {
let jwt = CString::new(data.jwt).unwrap().into_raw();
Box::new(SigninResponseWrapper {
success: true,
jwt,
error: std::ptr::null(),
})
}
Err(e) => {
let error_msg = CString::new(format!("Signin failed: {:?}", e))
.unwrap()
.into_raw();
Box::new(SigninResponseWrapper {
success: false,
jwt: std::ptr::null(),
error: error_msg,
})
}
};
callback(Box::into_raw(response));
});
std::thread::sleep(std::time::Duration::from_secs(2));
}
#[no_mangle]
#[tracing::instrument(skip_all)]
pub extern "C" fn free_signin_response(response: *mut SigninResponseWrapper) {
if response.is_null() {
return;
}
unsafe {
let _ = Box::from_raw(response);
}
}
#[repr(C)]
pub struct AggregateResponseWrapper {
success: bool,
results: *const c_char,
error: *const c_char,
}
#[no_mangle]
#[tracing::instrument(skip_all)]
pub extern "C" fn aggregate(
client: *mut ClientWrapper,
options: *mut AggregateRequestWrapper,
) -> *mut AggregateResponseWrapper {
let options = unsafe { &*options };
let client_wrapper = unsafe { &mut *client };
let client = &client_wrapper.client;
let runtime = &client_wrapper.runtime;
let request = AggregateRequest {
collectionname: unsafe { CStr::from_ptr(options.collectionname).to_str().unwrap() }
.to_string(),
aggregates: unsafe { CStr::from_ptr(options.aggregates).to_str().unwrap() }.to_string(),
queryas: unsafe { CStr::from_ptr(options.queryas).to_str().unwrap() }.to_string(),
hint: unsafe { CStr::from_ptr(options.hint).to_str().unwrap() }.to_string(),
explain: options.explain,
..Default::default()
};
if client.is_none() {
let error_msg = CString::new("Client is not connected").unwrap().into_raw();
let response = AggregateResponseWrapper {
success: false,
results: std::ptr::null(),
error: error_msg,
};
return Box::into_raw(Box::new(response));
}
let result = runtime.block_on(async {
client.as_ref().unwrap().aggregate(request).await
});
let response = match result {
Ok(data) => {
let results = CString::new(data.results).unwrap().into_raw();
AggregateResponseWrapper {
success: true,
results,
error: std::ptr::null(),
}
}
Err(e) => {
let error_msg = CString::new(format!("Aggregate failed: {:?}", e))
.unwrap()
.into_raw();
AggregateResponseWrapper {
success: false,
results: std::ptr::null(),
error: error_msg,
}
}
};
Box::into_raw(Box::new(response))
}
type AggregateCallback = extern "C" fn(wrapper: *mut AggregateResponseWrapper);
#[no_mangle]
#[tracing::instrument(skip_all)]
pub extern "C" fn aggregate_async(
client: *mut ClientWrapper,
options: *mut AggregateRequestWrapper,
callback: AggregateCallback,
) {
debug!("Rust: aggregate_async");
let options = unsafe { &*options };
let client_wrapper = unsafe { &mut *client };
let client = &client_wrapper.client;
let runtime = &client_wrapper.runtime;
let request = AggregateRequest {
collectionname: unsafe { CStr::from_ptr(options.collectionname).to_str().unwrap() }
.to_string(),
aggregates: unsafe { CStr::from_ptr(options.aggregates).to_str().unwrap() }.to_string(),
queryas: unsafe { CStr::from_ptr(options.queryas).to_str().unwrap() }.to_string(),
hint: unsafe { CStr::from_ptr(options.hint).to_str().unwrap() }.to_string(),
explain: options.explain,
..Default::default()
};
if client.is_none() {
let error_msg = CString::new("Client is not connected").unwrap().into_raw();
let response = AggregateResponseWrapper {
success: false,
results: std::ptr::null(),
error: error_msg,
};
return callback(Box::into_raw(Box::new(response)));
}
debug!("Rust: runtime.spawn");
runtime.spawn(async move {
debug!("Rust: client.aggregate");
let result = client.as_ref().unwrap().aggregate(request).await;
let response = match result {
Ok(data) => {
let results = CString::new(data.results).unwrap().into_raw();
AggregateResponseWrapper {
success: true,
results,
error: std::ptr::null(),
}
}
Err(e) => {
let error_msg = CString::new(format!("Aggregate failed: {:?}", e))
.unwrap()
.into_raw();
AggregateResponseWrapper {
success: false,
results: std::ptr::null(),
error: error_msg,
}
}
};
debug!("Rust: callback response");
callback(Box::into_raw(Box::new(response)));
});
}
#[no_mangle]
#[tracing::instrument(skip_all)]
pub extern "C" fn free_aggregate_response(response: *mut AggregateResponseWrapper) {
if response.is_null() {
return;
}
unsafe {
let _ = Box::from_raw(response);
}
}
#[repr(C)]
pub struct CountRequestWrapper {
collectionname: *const c_char,
query: *const c_char,
queryas: *const c_char,
explain: bool,
}
#[repr(C)]
pub struct CountResponseWrapper {
success: bool,
result: i32,
error: *const c_char,
}
#[no_mangle]
#[tracing::instrument(skip_all)]
pub extern "C" fn count(
client: *mut ClientWrapper,
options: *mut CountRequestWrapper,
) -> *mut CountResponseWrapper {
let options = unsafe { &*options };
let client_wrapper = unsafe { &mut *client };
let client = &client_wrapper.client;
let runtime = &client_wrapper.runtime;
let request = CountRequest {
collectionname: unsafe { CStr::from_ptr(options.collectionname).to_str().unwrap() }
.to_string(),
query: unsafe { CStr::from_ptr(options.query).to_str().unwrap() }.to_string(),
queryas: unsafe { CStr::from_ptr(options.queryas).to_str().unwrap() }.to_string(),
explain: options.explain,
..Default::default()
};
if client.is_none() {
let error_msg = CString::new("Client is not connected").unwrap().into_raw();
let response = CountResponseWrapper {
success: false,
result: 0,
error: error_msg,
};
return Box::into_raw(Box::new(response));
}
let result = runtime.block_on(async {
client.as_ref().unwrap().count(request).await
});
let response = match result {
Ok(data) => {
let result = data.result;
CountResponseWrapper {
success: true,
result,
error: std::ptr::null(),
}
}
Err(e) => {
let error_msg = CString::new(format!("Count failed: {:?}", e))
.unwrap()
.into_raw();
CountResponseWrapper {
success: false,
result: 0,
error: error_msg,
}
}
};
Box::into_raw(Box::new(response))
}
type CountCallback = extern "C" fn(wrapper: *mut CountResponseWrapper);
#[no_mangle]
#[tracing::instrument(skip_all)]
pub extern "C" fn count_async(
client: *mut ClientWrapper,
options: *mut CountRequestWrapper,
callback: CountCallback,
) {
let options = unsafe { &*options };
let client_wrapper = unsafe { &mut *client };
let client = &client_wrapper.client;
let runtime = &client_wrapper.runtime;
let request = CountRequest {
collectionname: unsafe { CStr::from_ptr(options.collectionname).to_str().unwrap() }
.to_string(),
query: unsafe { CStr::from_ptr(options.query).to_str().unwrap() }.to_string(),
queryas: unsafe { CStr::from_ptr(options.queryas).to_str().unwrap() }.to_string(),
explain: options.explain,
..Default::default()
};
if client.is_none() {
let error_msg = CString::new("Client is not connected").unwrap().into_raw();
let response = CountResponseWrapper {
success: false,
result: 0,
error: error_msg,
};
return callback(Box::into_raw(Box::new(response)));
}
runtime.spawn(async move {
let result = client.as_ref().unwrap().count(request).await;
let response = match result {
Ok(data) => {
let result = data.result;
CountResponseWrapper {
success: true,
result,
error: std::ptr::null(),
}
}
Err(e) => {
let error_msg = CString::new(format!("Count failed: {:?}", e))
.unwrap()
.into_raw();
CountResponseWrapper {
success: false,
result: 0,
error: error_msg,
}
}
};
callback(Box::into_raw(Box::new(response)));
});
}
#[no_mangle]
#[tracing::instrument(skip_all)]
pub extern "C" fn free_count_response(response: *mut CountResponseWrapper) {
if response.is_null() {
return;
}
unsafe {
let _ = Box::from_raw(response);
}
}
#[repr(C)]
pub struct DistinctRequestWrapper {
collectionname: *const c_char,
field: *const c_char,
query: *const c_char,
queryas: *const c_char,
explain: bool,
}
#[repr(C)]
pub struct DistinctResponseWrapper {
success: bool,
results: *mut *const c_char,
results_count: usize,
error: *const c_char,
}
#[no_mangle]
#[tracing::instrument(skip_all)]
pub extern "C" fn distinct(
client: *mut ClientWrapper,
options: *mut DistinctRequestWrapper,
) -> *mut DistinctResponseWrapper {
let options = unsafe { &*options };
let client_wrapper = unsafe { &mut *client };
let client = &client_wrapper.client;
let runtime = &client_wrapper.runtime;
let request = DistinctRequest {
collectionname: unsafe { CStr::from_ptr(options.collectionname).to_str().unwrap() }
.to_string(),
field: unsafe { CStr::from_ptr(options.field).to_str().unwrap() }.to_string(),
query: unsafe { CStr::from_ptr(options.query).to_str().unwrap() }.to_string(),
queryas: unsafe { CStr::from_ptr(options.queryas).to_str().unwrap() }.to_string(),
explain: options.explain,
..Default::default()
};
if client.is_none() {
let error_msg = CString::new("Client is not connected").unwrap().into_raw();
let response = DistinctResponseWrapper {
success: false,
results: std::ptr::null_mut(),
results_count: 0,
error: error_msg,
};
return Box::into_raw(Box::new(response));
}
let result = runtime.block_on(async {
client.as_ref().unwrap().distinct(request).await
});
let response = match result {
Ok(data) => {
let results_cstrings: Vec<CString> = data
.results
.iter()
.map(|s| CString::new(s.as_str()).unwrap())
.collect();
let results_ptrs: Vec<*const c_char> =
results_cstrings.iter().map(|s| s.as_ptr()).collect();
let results_array =
Box::into_raw(results_ptrs.clone().into_boxed_slice()) as *mut *const c_char;
std::mem::forget(results_cstrings);
DistinctResponseWrapper {
success: true,
results: results_array,
results_count: data.results.len(),
error: std::ptr::null(),
}
}
Err(e) => {
let error_msg = CString::new(format!("Distinct failed: {:?}", e))
.unwrap()
.into_raw();
DistinctResponseWrapper {
success: false,
results: std::ptr::null_mut(),
results_count: 0,
error: error_msg,
}
}
};
Box::into_raw(Box::new(response))
}
type DistinctCallback = extern "C" fn(wrapper: *mut DistinctResponseWrapper);
#[no_mangle]
#[tracing::instrument(skip_all)]
pub extern "C" fn distinct_async(
client: *mut ClientWrapper,
options: *mut DistinctRequestWrapper,
callback: DistinctCallback,
) {
let options = unsafe { &*options };
let client_wrapper = unsafe { &mut *client };
let client = &client_wrapper.client;
let runtime = &client_wrapper.runtime;
let request = DistinctRequest {
collectionname: unsafe { CStr::from_ptr(options.collectionname).to_str().unwrap() }
.to_string(),
field: unsafe { CStr::from_ptr(options.field).to_str().unwrap() }.to_string(),
query: unsafe { CStr::from_ptr(options.query).to_str().unwrap() }.to_string(),
queryas: unsafe { CStr::from_ptr(options.queryas).to_str().unwrap() }.to_string(),
explain: options.explain,
..Default::default()
};
if client.is_none() {
let error_msg = CString::new("Client is not connected").unwrap().into_raw();
let response = DistinctResponseWrapper {
success: false,
results: std::ptr::null_mut(),
results_count: 0,
error: error_msg,
};
return callback(Box::into_raw(Box::new(response)));
}
runtime.spawn(async move {
let result = client.as_ref().unwrap().distinct(request).await;
let response = match result {
Ok(data) => {
let results_cstrings: Vec<CString> = data
.results
.iter()
.map(|s| CString::new(s.as_str()).unwrap())
.collect();
let results_ptrs: Vec<*const c_char> =
results_cstrings.iter().map(|s| s.as_ptr()).collect();
let results_array =
Box::into_raw(results_ptrs.clone().into_boxed_slice()) as *mut *const c_char;
std::mem::forget(results_cstrings);
debug!("Rust: results_array: {:?}", results_array);
for (i, ptr) in results_ptrs.iter().enumerate() {
debug!("Rust: results_ptrs[{}]: {:?}: {:?}", i, ptr, unsafe {
CStr::from_ptr(*ptr).to_str().unwrap()
});
}
DistinctResponseWrapper {
success: true,
results: results_array,
results_count: data.results.len(),
error: std::ptr::null(),
}
}
Err(e) => {
let error_msg = CString::new(format!("Distinct failed: {:?}", e))
.unwrap()
.into_raw();
DistinctResponseWrapper {
success: false,
results: std::ptr::null_mut(),
results_count: 0,
error: error_msg,
}
}
};
callback(Box::into_raw(Box::new(response)));
});
}
#[no_mangle]
#[tracing::instrument(skip_all)]
pub extern "C" fn free_distinct_response(response: *mut DistinctResponseWrapper) {
unsafe {
if !response.is_null() {
let response = Box::from_raw(response);
if !response.results.is_null() {
for i in 0..response.results_count {
let c_str = *response.results.add(i);
if !c_str.is_null() {
_ = CString::from_raw(c_str as *mut c_char);
}
}
_ = Box::from_raw(response.results);
}
if !response.error.is_null() {
_ = CString::from_raw(response.error as *mut c_char);
}
}
}
}
#[repr(C)]
pub struct InsertOneRequestWrapper {
collectionname: *const c_char,
item: *const c_char,
w: i32,
j: bool,
}
#[repr(C)]
pub struct InsertOneResponseWrapper {
success: bool,
result: *const c_char,
error: *const c_char,
}
#[no_mangle]
#[tracing::instrument(skip_all)]
pub extern "C" fn insert_one(
client: *mut ClientWrapper,
options: *mut InsertOneRequestWrapper,
) -> *mut InsertOneResponseWrapper {
let options = unsafe { &*options };
let client_wrapper = unsafe { &mut *client };
let client = &client_wrapper.client;
let runtime = &client_wrapper.runtime;
let request = InsertOneRequest {
collectionname: unsafe { CStr::from_ptr(options.collectionname).to_str().unwrap() }
.to_string(),
item: unsafe { CStr::from_ptr(options.item).to_str().unwrap() }.to_string(),
w: options.w,
j: options.j,
..Default::default()
};
if client.is_none() {
let error_msg = CString::new("Client is not connected").unwrap().into_raw();
let response = InsertOneResponseWrapper {
success: false,
result: std::ptr::null(),
error: error_msg,
};
return Box::into_raw(Box::new(response));
}
let result = runtime.block_on(async {
client.as_ref().unwrap().insert_one(request).await
});
let response = match result {
Ok(data) => {
let result = CString::new(data.result).unwrap().into_raw();
InsertOneResponseWrapper {
success: true,
result,
error: std::ptr::null(),
}
}
Err(e) => {
let error_msg = CString::new(format!("InsertOne failed: {:?}", e))
.unwrap()
.into_raw();
InsertOneResponseWrapper {
success: false,
result: std::ptr::null(),
error: error_msg,
}
}
};
Box::into_raw(Box::new(response))
}
type InsertOneCallback = extern "C" fn(wrapper: *mut InsertOneResponseWrapper);
#[no_mangle]
#[tracing::instrument(skip_all)]
pub extern "C" fn insert_one_async(
client: *mut ClientWrapper,
options: *mut InsertOneRequestWrapper,
callback: InsertOneCallback,
) {
let options = unsafe { &*options };
let client_wrapper = unsafe { &mut *client };
let client = &client_wrapper.client;
let runtime = &client_wrapper.runtime;
let request = InsertOneRequest {
collectionname: unsafe { CStr::from_ptr(options.collectionname).to_str().unwrap() }
.to_string(),
item: unsafe { CStr::from_ptr(options.item).to_str().unwrap() }.to_string(),
w: options.w,
j: options.j,
..Default::default()
};
if client.is_none() {
let error_msg = CString::new("Client is not connected").unwrap().into_raw();
let response = InsertOneResponseWrapper {
success: false,
result: std::ptr::null(),
error: error_msg,
};
return callback(Box::into_raw(Box::new(response)));
}
runtime.spawn(async move {
let result = client.as_ref().unwrap().insert_one(request).await;
let response = match result {
Ok(data) => {
let result = CString::new(data.result).unwrap().into_raw();
InsertOneResponseWrapper {
success: true,
result,
error: std::ptr::null(),
}
}
Err(e) => {
let error_msg = CString::new(format!("InsertOne failed: {:?}", e))
.unwrap()
.into_raw();
InsertOneResponseWrapper {
success: false,
result: std::ptr::null(),
error: error_msg,
}
}
};
callback(Box::into_raw(Box::new(response)));
});
}
#[no_mangle]
#[tracing::instrument(skip_all)]
pub extern "C" fn free_insert_one_response(response: *mut InsertOneResponseWrapper) {
if response.is_null() {
return;
}
unsafe {
let _ = Box::from_raw(response);
}
}
#[repr(C)]
pub struct DownloadRequestWrapper {
collectionname: *const c_char,
id: *const c_char,
folder: *const c_char,
filename: *const c_char,
}
#[repr(C)]
pub struct DownloadResponseWrapper {
success: bool,
filename: *const c_char,
error: *const c_char,
}
#[no_mangle]
#[tracing::instrument(skip_all)]
pub extern "C" fn download(
client: *mut ClientWrapper,
options: *mut DownloadRequestWrapper,
) -> *mut DownloadResponseWrapper {
let options = unsafe { &*options };
let client_wrapper = unsafe { &mut *client };
let client = &client_wrapper.client;
let runtime = &client_wrapper.runtime;
let folder = unsafe { CStr::from_ptr(options.folder).to_str().unwrap() };
let filename = unsafe { CStr::from_ptr(options.filename).to_str().unwrap() };
let request = DownloadRequest {
collectionname: unsafe { CStr::from_ptr(options.collectionname).to_str().unwrap() }
.to_string(),
filename: unsafe { CStr::from_ptr(options.filename).to_str().unwrap() }.to_string(),
id: unsafe { CStr::from_ptr(options.id).to_str().unwrap() }.to_string(),
..Default::default()
};
if client.is_none() {
let error_msg = CString::new("Client is not connected").unwrap().into_raw();
let response = DownloadResponseWrapper {
success: false,
filename: std::ptr::null(),
error: error_msg,
};
return Box::into_raw(Box::new(response));
}
let result = runtime.block_on(async {
client
.as_ref()
.unwrap()
.download(request, Some(folder), Some(filename))
.await
});
let response = match result {
Ok(data) => {
let filename = CString::new(data.filename).unwrap().into_raw();
DownloadResponseWrapper {
success: true,
filename,
error: std::ptr::null(),
}
}
Err(e) => {
let error_msg = CString::new(format!("Download failed: {:?}", e))
.unwrap()
.into_raw();
DownloadResponseWrapper {
success: false,
filename: std::ptr::null(),
error: error_msg,
}
}
};
Box::into_raw(Box::new(response))
}
type DownloadCallback = extern "C" fn(wrapper: *mut DownloadResponseWrapper);
#[no_mangle]
#[tracing::instrument(skip_all)]
pub extern "C" fn download_async(
client: *mut ClientWrapper,
options: *mut DownloadRequestWrapper,
callback: DownloadCallback,
) {
let options = unsafe { &*options };
let client_wrapper = unsafe { &mut *client };
let client = &client_wrapper.client;
let runtime = &client_wrapper.runtime;
let folder = unsafe { CStr::from_ptr(options.folder).to_str().unwrap() };
let filename = unsafe { CStr::from_ptr(options.filename).to_str().unwrap() };
let request = DownloadRequest {
collectionname: unsafe { CStr::from_ptr(options.collectionname).to_str().unwrap() }
.to_string(),
filename: unsafe { CStr::from_ptr(options.filename).to_str().unwrap() }.to_string(),
id: unsafe { CStr::from_ptr(options.id).to_str().unwrap() }.to_string(),
..Default::default()
};
if client.is_none() {
let error_msg = CString::new("Client is not connected").unwrap().into_raw();
let response = DownloadResponseWrapper {
success: false,
filename: std::ptr::null(),
error: error_msg,
};
return callback(Box::into_raw(Box::new(response)));
}
runtime.spawn(async move {
let result = client
.as_ref()
.unwrap()
.download(request, Some(folder), Some(filename))
.await;
let response = match result {
Ok(data) => {
let filename = CString::new(data.filename).unwrap().into_raw();
DownloadResponseWrapper {
success: true,
filename,
error: std::ptr::null(),
}
}
Err(e) => {
let error_msg = CString::new(format!("Download failed: {:?}", e))
.unwrap()
.into_raw();
DownloadResponseWrapper {
success: false,
filename: std::ptr::null(),
error: error_msg,
}
}
};
callback(Box::into_raw(Box::new(response)));
});
}
#[no_mangle]
#[tracing::instrument(skip_all)]
pub extern "C" fn free_download_response(response: *mut DownloadResponseWrapper) {
if response.is_null() {
return;
}
unsafe {
let _ = Box::from_raw(response);
}
}
#[repr(C)]
pub struct UploadRequestWrapper {
filepath: *const c_char,
filename: *const c_char,
mimetype: *const c_char,
metadata: *const c_char,
collectionname: *const c_char,
}
#[repr(C)]
pub struct UploadResponseWrapper {
success: bool,
id: *const c_char,
error: *const c_char,
}
#[no_mangle]
#[tracing::instrument(skip_all)]
pub extern "C" fn upload(
client: *mut ClientWrapper,
options: *mut UploadRequestWrapper,
) -> *mut UploadResponseWrapper {
let options = unsafe { &*options };
let client_wrapper = unsafe { &mut *client };
let client = &client_wrapper.client;
let runtime = &client_wrapper.runtime;
let filepath = unsafe { CStr::from_ptr(options.filepath).to_str().unwrap() };
if filepath.is_empty() {
let error_msg = CString::new("Filepath is required").unwrap().into_raw();
let response = UploadResponseWrapper {
success: false,
id: std::ptr::null(),
error: error_msg,
};
return Box::into_raw(Box::new(response));
}
let filepath = filepath.to_string();
debug!("Rust::upload: filepath: {}", filepath);
let filename = unsafe { CStr::from_ptr(options.filename).to_str().unwrap() };
if filename.is_empty() {
let error_msg = CString::new("Filename is required").unwrap().into_raw();
let response = UploadResponseWrapper {
success: false,
id: std::ptr::null(),
error: error_msg,
};
return Box::into_raw(Box::new(response));
}
let request = UploadRequest {
filename: filename.to_string(),
mimetype: unsafe { CStr::from_ptr(options.mimetype).to_str().unwrap() }.to_string(),
metadata: unsafe { CStr::from_ptr(options.metadata).to_str().unwrap() }.to_string(),
collectionname: unsafe { CStr::from_ptr(options.collectionname).to_str().unwrap() }
.to_string(),
..Default::default()
};
if client.is_none() {
let error_msg = CString::new("Client is not connected").unwrap().into_raw();
let response = UploadResponseWrapper {
success: false,
id: std::ptr::null(),
error: error_msg,
};
return Box::into_raw(Box::new(response));
}
debug!("Rust::upload: runtime.block_on");
let result = runtime.block_on(async {
client.as_ref().unwrap().upload(request, &filepath).await
});
let response = match result {
Ok(data) => {
let id = CString::new(data.id).unwrap().into_raw();
UploadResponseWrapper {
success: true,
id,
error: std::ptr::null(),
}
}
Err(e) => {
let error_msg = CString::new(format!("Upload failed: {:?}", e))
.unwrap()
.into_raw();
UploadResponseWrapper {
success: false,
id: std::ptr::null(),
error: error_msg,
}
}
};
Box::into_raw(Box::new(response))
}
type UploadCallback = extern "C" fn(wrapper: *mut UploadResponseWrapper);
#[no_mangle]
#[tracing::instrument(skip_all)]
pub extern "C" fn upload_async(
client: *mut ClientWrapper,
options: *mut UploadRequestWrapper,
callback: UploadCallback,
) {
let options = unsafe { &*options };
let client_wrapper = unsafe { &mut *client };
let client = &client_wrapper.client;
let runtime = &client_wrapper.runtime;
let filepath = unsafe { CStr::from_ptr(options.filepath).to_str().unwrap() };
if filepath.is_empty() {
let error_msg = CString::new("Filepath is required").unwrap().into_raw();
let response = UploadResponseWrapper {
success: false,
id: std::ptr::null(),
error: error_msg,
};
return callback(Box::into_raw(Box::new(response)));
}
let filepath = filepath.to_string();
debug!("Rust::upload_async: filepath: {}", filepath);
let filename = unsafe { CStr::from_ptr(options.filename).to_str().unwrap() };
if filename.is_empty() {
let error_msg = CString::new("Filename is required").unwrap().into_raw();
let response = UploadResponseWrapper {
success: false,
id: std::ptr::null(),
error: error_msg,
};
return callback(Box::into_raw(Box::new(response)));
}
let request = UploadRequest {
filename: filename.to_string(),
mimetype: unsafe { CStr::from_ptr(options.mimetype).to_str().unwrap() }.to_string(),
metadata: unsafe { CStr::from_ptr(options.metadata).to_str().unwrap() }.to_string(),
collectionname: unsafe { CStr::from_ptr(options.collectionname).to_str().unwrap() }
.to_string(),
..Default::default()
};
if client.is_none() {
let error_msg = CString::new("Client is not connected").unwrap().into_raw();
let response = UploadResponseWrapper {
success: false,
id: std::ptr::null(),
error: error_msg,
};
return callback(Box::into_raw(Box::new(response)));
}
debug!("Rust::upload_async: runtime.spawn");
runtime.spawn(async move {
debug!("Rust::upload_async: call client.upload");
let result = client.as_ref().unwrap().upload(request, &filepath).await;
debug!("Rust::upload_async: call client.upload done");
let response = match result {
Ok(data) => {
let id = CString::new(data.id).unwrap().into_raw();
UploadResponseWrapper {
success: true,
id,
error: std::ptr::null(),
}
}
Err(e) => {
let error_msg = CString::new(format!("Upload failed: {:?}", e))
.unwrap()
.into_raw();
UploadResponseWrapper {
success: false,
id: std::ptr::null(),
error: error_msg,
}
}
};
debug!("Rust::upload_async: call callback with response");
callback(Box::into_raw(Box::new(response)));
});
}
#[no_mangle]
#[tracing::instrument(skip_all)]
pub extern "C" fn free_upload_response(response: *mut UploadResponseWrapper) {
if response.is_null() {
return;
}
unsafe {
let _ = Box::from_raw(response);
}
}
#[repr(C)]
pub struct WatchRequestWrapper {
collectionname: *const c_char,
paths: *const c_char,
}
#[repr(C)]
pub struct WatchResponseWrapper {
success: bool,
watchid: *const c_char,
error: *const c_char,
}
#[no_mangle]
#[tracing::instrument(skip_all)]
pub extern "C" fn watch(
client: *mut ClientWrapper,
options: *mut WatchRequestWrapper,
) -> *mut WatchResponseWrapper {
let options = unsafe { &*options };
let client_wrapper = unsafe { &mut *client };
let client = &client_wrapper.client;
let runtime = &client_wrapper.runtime;
let paths = unsafe { CStr::from_ptr(options.paths).to_str().unwrap() };
let paths = paths.split(",").map(|s| s.to_string()).collect();
let request = WatchRequest {
collectionname: unsafe { CStr::from_ptr(options.collectionname).to_str().unwrap() }
.to_string(),
paths: paths,
};
if client.is_none() {
let error_msg = CString::new("Client is not connected").unwrap().into_raw();
let response = WatchResponseWrapper {
success: false,
watchid: std::ptr::null(),
error: error_msg,
};
return Box::into_raw(Box::new(response));
}
let result = runtime.block_on(async {
client
.as_ref()
.unwrap()
.watch(
request,
Box::new(move |event: WatchEvent| {
debug!("Rust::watch: event: {:?}", event);
let watchid = CString::new(event.id.clone())
.unwrap()
.into_string()
.unwrap();
let mut e = WATCH_EVENTS.lock().unwrap();
let queue = e.get_mut(&watchid);
match queue {
Some(q) => {
q.push_back(event);
}
None => {
let mut q = std::collections::VecDeque::new();
q.push_back(event);
e.insert(watchid, q);
}
}
}),
)
.await
});
let response = match result {
Ok(data) => {
let id = String::from(&data);
let mut events = WATCH_EVENTS.lock().unwrap();
let queue = events.get_mut(&id);
if queue.is_none() {
let q = std::collections::VecDeque::new();
let k = String::from(&data);
events.insert(k, q);
}
let watchid = CString::new(id).unwrap().into_raw();
WatchResponseWrapper {
success: true,
watchid,
error: std::ptr::null(),
}
}
Err(e) => {
let error_msg = CString::new(format!("Watch failed: {:?}", e))
.unwrap()
.into_raw();
WatchResponseWrapper {
success: false,
watchid: std::ptr::null(),
error: error_msg,
}
}
};
Box::into_raw(Box::new(response))
}
#[no_mangle]
#[tracing::instrument(skip_all)]
pub extern "C" fn next_watch_event (
watchid: *const c_char,
) -> *mut WatchEventWrapper {
debug!("unwrap watchid");
let watchid = unsafe { CStr::from_ptr(watchid).to_str().unwrap() };
debug!("watchid {:}", watchid);
let watchid = watchid.to_string();
debug!("unwrap events");
let mut e = WATCH_EVENTS.lock().unwrap();
debug!("get queue");
let queue = e.get_mut(&watchid);
match queue {
Some(q) => {
match q.pop_front() {
Some(event) => {
debug!("got event");
let id = CString::new(event.id).unwrap().into_raw();
let operation = CString::new(event.operation).unwrap().into_raw();
let document = CString::new(event.document).unwrap().into_raw();
let event = Box::new(WatchEventWrapper {
id,
operation,
document
});
Box::into_raw(event)
}
None => {
debug!("No event");
Box::into_raw(Box::new(WatchEventWrapper::default()))
},
}
},
None => {
debug!("Queue for {:} not found", watchid);
Box::into_raw(Box::new(WatchEventWrapper::default()))
},
}
}
#[no_mangle]
#[tracing::instrument(skip_all)]
pub extern "C" fn free_watch_event(response: *mut WatchEventWrapper) {
if response.is_null() {
return;
}
unsafe {
let _ = Box::from_raw(response);
}
}
type WatchCallback = extern "C" fn(wrapper: *mut WatchResponseWrapper);
#[no_mangle]
#[tracing::instrument(skip_all)]
pub extern "C" fn watch_async(
client: *mut ClientWrapper,
options: *mut WatchRequestWrapper,
callback: WatchCallback,
event_callback: extern "C" fn(*const c_char),
) {
debug!("Rust::watch_async");
let options = unsafe { &*options };
let client_wrapper = unsafe { &mut *client };
let client = &client_wrapper.client;
let runtime = &client_wrapper.runtime;
let paths = unsafe { CStr::from_ptr(options.paths).to_str().unwrap() };
let paths = paths.split(",").map(|s| s.to_string()).collect();
let request = WatchRequest {
collectionname: unsafe { CStr::from_ptr(options.collectionname).to_str().unwrap() }
.to_string(),
paths: paths,
};
if client.is_none() {
let error_msg = CString::new("Client is not connected").unwrap().into_raw();
let response = WatchResponseWrapper {
success: false,
watchid: std::ptr::null(),
error: error_msg,
};
return callback(Box::into_raw(Box::new(response)));
}
debug!("Rust::watch_async: runtime.spawn");
runtime.spawn(async move {
debug!("Rust::watch_async: call client.watch");
let result = client
.as_ref()
.unwrap()
.watch(
request,
Box::new(move |event: WatchEvent| {
let event = serde_json::to_string(&event).unwrap();
let c_event = std::ffi::CString::new(event).unwrap();
event_callback(c_event.as_ptr());
}),
)
.await;
let response = match result {
Ok(data) => {
let watchid = CString::new(data).unwrap().into_raw();
WatchResponseWrapper {
success: true,
watchid,
error: std::ptr::null(),
}
}
Err(e) => {
let error_msg = CString::new(format!("Watch failed: {:?}", e))
.unwrap()
.into_raw();
WatchResponseWrapper {
success: false,
watchid: std::ptr::null(),
error: error_msg,
}
}
};
debug!("Rust::watch_async: call callback with response");
callback(Box::into_raw(Box::new(response)));
});
}
#[no_mangle]
#[tracing::instrument(skip_all)]
pub extern "C" fn free_watch_response(response: *mut WatchResponseWrapper) {
if response.is_null() {
return;
}
unsafe {
let _ = Box::from_raw(response);
}
}
#[repr(C)]
pub struct UnWatchResponseWrapper {
success: bool,
error: *const c_char,
}
#[no_mangle]
#[tracing::instrument(skip_all)]
pub extern "C" fn unwatch(
client: *mut ClientWrapper,
watchid: *const c_char,
) -> *mut UnWatchResponseWrapper {
let client_wrapper = unsafe { &mut *client };
let client = &client_wrapper.client;
let runtime = &client_wrapper.runtime;
let watchid = unsafe { CStr::from_ptr(watchid).to_str().unwrap() };
if client.is_none() {
let error_msg = CString::new("Client is not connected").unwrap().into_raw();
let response = UnWatchResponseWrapper {
success: false,
error: error_msg,
};
return Box::into_raw(Box::new(response));
}
let result = runtime.block_on(async {
let c = client.as_ref().unwrap();
c.unwatch(watchid).await
});
match result {
Ok(_) => {
let response = UnWatchResponseWrapper {
success: true,
error: std::ptr::null(),
};
Box::into_raw(Box::new(response))
}
Err(e) => {
let error_msg = CString::new(format!("Unwatch failed: {:?}", e))
.unwrap()
.into_raw();
let response = UnWatchResponseWrapper {
success: false,
error: error_msg,
};
Box::into_raw(Box::new(response))
}
}
}
#[no_mangle]
#[tracing::instrument(skip_all)]
pub extern "C" fn free_unwatch_response(response: *mut UnWatchResponseWrapper) {
if response.is_null() {
return;
}
unsafe {
let _ = Box::from_raw(response);
}
}
#[repr(C)]
pub struct RegisterQueueRequestWrapper {
queuename: *const c_char,
}
#[repr(C)]
pub struct RegisterQueueResponseWrapper {
success: bool,
queuename: *const c_char,
error: *const c_char,
}
#[no_mangle]
#[tracing::instrument(skip_all)]
pub extern "C" fn register_queue(
client: *mut ClientWrapper,
options: *mut RegisterQueueRequestWrapper,
) -> *mut RegisterQueueResponseWrapper {
let options = unsafe { &*options };
let client_wrapper = unsafe { &mut *client };
let client = &client_wrapper.client;
let runtime = &client_wrapper.runtime;
let request = RegisterQueueRequest {
queuename: unsafe { CStr::from_ptr(options.queuename).to_str().unwrap() }
.to_string(),
};
if client.is_none() {
let error_msg = CString::new("Client is not connected").unwrap().into_raw();
let response = RegisterQueueResponseWrapper {
success: false,
queuename: std::ptr::null(),
error: error_msg,
};
return Box::into_raw(Box::new(response));
}
let result = runtime.block_on(async {
client
.as_ref()
.unwrap()
.register_queue(
request,
Box::new(move |event: QueueEvent| {
println!("Rust::queue: event: {:?}", event);
let queuename = CString::new(event.queuename.clone())
.unwrap()
.into_string()
.unwrap();
let mut e = QUEUE_EVENTS.lock().unwrap();
let queue = e.get_mut(&queuename);
match queue {
Some(q) => {
q.push_back(event);
}
None => {
let mut q = std::collections::VecDeque::new();
q.push_back(event);
e.insert(queuename, q);
}
}
}),
)
.await
});
let response = match result {
Ok(data) => {
let id = String::from(&data);
let mut events = QUEUE_EVENTS.lock().unwrap();
let queue = events.get_mut(&id);
if queue.is_none() {
let q = std::collections::VecDeque::new();
let k = String::from(&data);
events.insert(k, q);
}
let queuename = CString::new(id).unwrap().into_raw();
RegisterQueueResponseWrapper {
success: true,
queuename,
error: std::ptr::null(),
}
}
Err(e) => {
let error_msg = CString::new(format!("queue failed: {:?}", e))
.unwrap()
.into_raw();
RegisterQueueResponseWrapper {
success: false,
queuename: std::ptr::null(),
error: error_msg,
}
}
};
Box::into_raw(Box::new(response))
}
#[no_mangle]
#[tracing::instrument(skip_all)]
pub extern "C" fn free_register_queue_response(response: *mut RegisterQueueResponseWrapper) {
if response.is_null() {
return;
}
unsafe {
let _ = Box::from_raw(response);
}
}
#[repr(C)]
pub struct RegisterExchangeRequestWrapper {
exchangename: *const c_char,
algorithm: *const c_char,
routingkey: *const c_char,
addqueue: bool
}
#[repr(C)]
pub struct RegisterExchangeResponseWrapper {
success: bool,
queuename: *const c_char,
error: *const c_char,
}
#[no_mangle]
#[tracing::instrument(skip_all)]
pub extern "C" fn register_exchange (
client: *mut ClientWrapper,
options: *mut RegisterExchangeRequestWrapper,
) -> *mut RegisterExchangeResponseWrapper {
let options = unsafe { &*options };
let client_wrapper = unsafe { &mut *client };
let client = &client_wrapper.client;
let runtime = &client_wrapper.runtime;
let request = RegisterExchangeRequest {
exchangename: unsafe { CStr::from_ptr(options.exchangename).to_str().unwrap() }
.to_string(),
algorithm: unsafe { CStr::from_ptr(options.algorithm).to_str().unwrap() }
.to_string(),
routingkey: unsafe { CStr::from_ptr(options.routingkey).to_str().unwrap() }
.to_string(),
addqueue: options.addqueue,
};
if client.is_none() {
let error_msg = CString::new("Client is not connected").unwrap().into_raw();
let response = RegisterExchangeResponseWrapper {
success: false,
queuename: std::ptr::null(),
error: error_msg,
};
return Box::into_raw(Box::new(response));
}
let result = runtime.block_on(async {
client
.as_ref()
.unwrap()
.register_exchange(request,
Box::new(move |event: QueueEvent| {
println!("Rust::exchange: event: {:?}", event);
let queuename = CString::new(event.queuename.clone())
.unwrap()
.into_string()
.unwrap();
let mut e = QUEUE_EVENTS.lock().unwrap();
let queue = e.get_mut(&queuename);
match queue {
Some(q) => {
q.push_back(event);
}
None => {
let mut q = std::collections::VecDeque::new();
q.push_back(event);
e.insert(queuename, q);
}
}
}),
)
.await
});
let response = match result {
Ok(data) => {
let queuename = CString::new(data.queuename).unwrap().into_raw();
RegisterExchangeResponseWrapper {
success: true,
queuename,
error: std::ptr::null(),
}
}
Err(e) => {
let error_msg = CString::new(format!("RegisterExchange failed: {:?}", e))
.unwrap()
.into_raw();
RegisterExchangeResponseWrapper {
success: false,
queuename: std::ptr::null(),
error: error_msg,
}
}
};
Box::into_raw(Box::new(response))
}
#[repr(C)]
#[derive(Debug, Clone)]
pub struct QueueEventWrapper {
queuename: *const c_char,
correlation_id: *const c_char,
replyto: *const c_char,
routingkey: *const c_char,
exchangename: *const c_char,
data: *const c_char,
}
impl Default for QueueEventWrapper {
fn default() -> Self {
QueueEventWrapper {
queuename: std::ptr::null(),
correlation_id: std::ptr::null(),
replyto: std::ptr::null(),
routingkey: std::ptr::null(),
exchangename: std::ptr::null(),
data: std::ptr::null(),
}
}
}
#[no_mangle]
#[tracing::instrument(skip_all)]
pub extern "C" fn next_queue_event (
queuename: *const c_char,
) -> *mut QueueEventWrapper {
debug!("unwrap watchid");
let queuename = unsafe { CStr::from_ptr(queuename).to_str().unwrap() };
debug!("queuename {:}", queuename);
let queuename = queuename.to_string();
debug!("unwrap events");
let mut e = QUEUE_EVENTS.lock().unwrap();
debug!("get queue");
let queue = e.get_mut(&queuename);
match queue {
Some(q) => {
match q.pop_front() {
Some(event) => {
debug!("got event");
let queuename = CString::new(event.queuename).unwrap().into_raw();
let correlation_id = CString::new(event.correlation_id).unwrap().into_raw();
let replyto = CString::new(event.replyto).unwrap().into_raw();
let routingkey = CString::new(event.routingkey).unwrap().into_raw();
let exchangename = CString::new(event.exchangename).unwrap().into_raw();
let data = CString::new(event.data).unwrap().into_raw();
let event = Box::new(QueueEventWrapper {
queuename,
correlation_id,
replyto,
routingkey,
exchangename,
data,
});
Box::into_raw(event)
}
None => {
debug!("No event");
Box::into_raw(Box::new(QueueEventWrapper::default()))
},
}
},
None => {
debug!("Queue for {:} not found", queuename);
Box::into_raw(Box::new(QueueEventWrapper::default()))
},
}
}
#[no_mangle]
#[tracing::instrument(skip_all)]
pub extern "C" fn free_queue_event(response: *mut QueueEventWrapper) {
if response.is_null() {
return;
}
unsafe {
let _ = Box::from_raw(response);
}
}
#[repr(C)]
pub struct UnRegisterQueueResponseWrapper {
success: bool,
error: *const c_char,
}
#[no_mangle]
#[tracing::instrument(skip_all)]
pub extern "C" fn unregister_queue(
client: *mut ClientWrapper,
queuename: *const c_char,
) -> *mut UnRegisterQueueResponseWrapper {
let client_wrapper = unsafe { &mut *client };
let client = &client_wrapper.client;
let runtime = &client_wrapper.runtime;
let queuename = unsafe { CStr::from_ptr(queuename).to_str().unwrap() };
if client.is_none() {
let error_msg = CString::new("Client is not connected").unwrap().into_raw();
let response = UnRegisterQueueResponseWrapper {
success: false,
error: error_msg,
};
return Box::into_raw(Box::new(response));
}
let result = runtime.block_on(async {
let c = client.as_ref().unwrap();
c.unregister_queue(queuename).await
});
match result {
Ok(_) => {
let response = UnRegisterQueueResponseWrapper {
success: true,
error: std::ptr::null(),
};
Box::into_raw(Box::new(response))
}
Err(e) => {
let error_msg = CString::new(format!("Unregister queue failed: {:?}", e))
.unwrap()
.into_raw();
let response = UnRegisterQueueResponseWrapper {
success: false,
error: error_msg,
};
Box::into_raw(Box::new(response))
}
}
}
#[no_mangle]
#[tracing::instrument(skip_all)]
pub extern "C" fn free_unregister_queue_response(response: *mut UnRegisterQueueResponseWrapper) {
if response.is_null() {
return;
}
unsafe {
let _ = Box::from_raw(response);
}
}