use http::uri::Uri;
use std::{ptr, time::Duration};
use tokio::runtime::Runtime;
use tokio::sync::oneshot;
use tokio::time::timeout;
use tracing::Level;
use crate::rpc::dial::{
DialBuilder, DialOptions, RPCCredentials, ViamChannel, WithCredentials, WithoutCredentials,
};
use libc::{c_char, c_void};
use crate::proxy;
use hyper::Server;
use std::ffi::{CStr, CString};
use tower::{make::Shared, ServiceBuilder};
use tower_http::{
trace::{DefaultMakeSpan, DefaultOnRequest, DefaultOnResponse, TraceLayer},
LatencyUnit,
};
use anyhow::Result;
use crate::proxy::grpc_proxy::GRPCProxy;
pub struct DialFfi {
runtime: Option<Runtime>,
sigs: Option<Vec<oneshot::Sender<()>>>,
channels: Vec<ViamChannel>,
}
impl Drop for DialFfi {
fn drop(&mut self) {
log::debug!("FFI runtime closing");
if let Some(r) = self.runtime.take() {
r.shutdown_timeout(Duration::from_secs(1));
}
}
}
impl DialFfi {
fn new() -> Self {
Self {
runtime: Some(Runtime::new().unwrap()),
sigs: None,
channels: vec![],
}
}
fn push_signal(&mut self, sig: oneshot::Sender<()>) {
match self.sigs {
Some(ref mut v) => v.push(sig),
None => {
let v: Vec<oneshot::Sender<()>> = vec![sig];
self.sigs = Some(v);
}
}
}
}
#[derive(Default)]
struct DialOpts {
force_relay: bool,
force_p2p: bool,
turn_uri: Option<String>,
}
#[no_mangle]
pub extern "C" fn viam_init_rust_runtime() -> Box<DialFfi> {
let _ = tracing_subscriber::fmt::try_init();
Box::new(DialFfi::new())
}
#[no_mangle]
#[deprecated]
pub extern "C" fn init_rust_runtime() -> Box<DialFfi> {
viam_init_rust_runtime()
}
#[no_mangle]
pub extern "C" fn viam_dial_opts_new() -> *mut c_void {
Box::into_raw(Box::<DialOpts>::default()) as *mut c_void
}
#[no_mangle]
pub unsafe extern "C" fn viam_dial_opts_free(opts: *mut c_void) {
if opts.is_null() {
return;
}
drop(Box::from_raw(opts as *mut DialOpts));
}
#[no_mangle]
pub unsafe extern "C" fn viam_dial_opts_set_force_relay(opts: *mut c_void, value: bool) {
if opts.is_null() {
return;
}
(*(opts as *mut DialOpts)).force_relay = value;
}
#[no_mangle]
pub unsafe extern "C" fn viam_dial_opts_set_force_p2p(opts: *mut c_void, value: bool) {
if opts.is_null() {
return;
}
(*(opts as *mut DialOpts)).force_p2p = value;
}
#[no_mangle]
pub unsafe extern "C" fn viam_dial_opts_set_turn_uri(opts: *mut c_void, value: *const c_char) {
if opts.is_null() {
return;
}
let opts = &mut *(opts as *mut DialOpts);
if value.is_null() {
opts.turn_uri = None;
return;
}
match CStr::from_ptr(value).to_str() {
Ok(s) if !s.is_empty() => opts.turn_uri = Some(s.to_string()),
Ok(_) => opts.turn_uri = None,
Err(e) => log::error!("invalid turn_uri string: {e:?}"),
}
}
fn dial_without_cred(
uri: String,
allow_insec: bool,
disable_webrtc: bool,
opts: &DialOpts,
) -> Result<DialBuilder<WithoutCredentials>> {
let c = DialOptions::builder().uri(&uri).without_credentials();
let c = if disable_webrtc { c.disable_webrtc() } else { c };
let c = if allow_insec { c.allow_downgrade() } else { c };
let c = if opts.force_relay { c.force_relay() } else { c };
let c = if opts.force_p2p { c.force_p2p() } else { c };
let c = if let Some(u) = opts.turn_uri.clone() {
c.turn_uri(u)
} else {
c
};
Ok(c)
}
fn dial_with_cred(
uri: String,
entity: Option<String>,
r#type: &str,
payload: &str,
allow_insec: bool,
disable_webrtc: bool,
opts: &DialOpts,
) -> Result<DialBuilder<WithCredentials>> {
let creds = RPCCredentials::new(entity, String::from(r#type), String::from(payload));
let c = DialOptions::builder().uri(&uri).with_credentials(creds);
let c = if disable_webrtc { c.disable_webrtc() } else { c };
let c = if allow_insec { c.allow_downgrade() } else { c };
let c = if opts.force_relay { c.force_relay() } else { c };
let c = if opts.force_p2p { c.force_p2p() } else { c };
let c = if let Some(u) = opts.turn_uri.clone() {
c.turn_uri(u)
} else {
c
};
Ok(c)
}
unsafe fn dial_impl(
c_uri: *const c_char,
c_entity: *const c_char,
c_type: *const c_char,
c_payload: *const c_char,
c_allow_insec: bool,
c_timeout: f32,
rt_ptr: Option<&mut DialFfi>,
opts: &DialOpts,
) -> *mut c_char {
let uri = {
if c_uri.is_null() {
return ptr::null_mut();
}
let ur = match Uri::from_maybe_shared(CStr::from_ptr(c_uri).to_bytes()) {
Ok(ur) => ur,
Err(e) => {
log::error!("Sorry {e:?} is not a valid URI");
return ptr::null_mut();
}
};
ur
};
let allow_insec = c_allow_insec;
let ctx = match rt_ptr {
Some(rt) => rt,
None => {
return ptr::null_mut();
}
};
let runtime = match &ctx.runtime {
Some(r) => r,
None => {
return ptr::null_mut();
}
};
let conn = match runtime.block_on(async { proxy::connector::Connector::new() }) {
Ok(conn) => conn,
Err(e) => {
log::error!("Error creating the proxy {e:?}");
return ptr::null_mut();
}
};
let path = match CString::new(conn.get_path()) {
Ok(s) => s,
Err(e) => {
log::error!("Error getting the path {e:?}");
return ptr::null_mut();
}
};
let (tx, rx) = oneshot::channel::<()>();
let uri_str = uri.to_string();
let disable_webrtc;
if let Some(host) = uri.host() {
disable_webrtc = host.contains(".local") || host.contains("localhost");
} else {
disable_webrtc = uri_str.contains(".local") || uri_str.contains("localhost");
}
let r#type = {
match c_type.is_null() {
true => None,
false => Some(CStr::from_ptr(c_type)),
}
};
let payload = {
match c_payload.is_null() {
true => None,
false => Some(CStr::from_ptr(c_payload)),
}
};
let entity_opt = {
match c_entity.is_null() {
true => None,
false => match CStr::from_ptr(c_entity).to_str() {
Ok(ent) => Some(ent.to_string()),
Err(e) => {
log::error!(
"Error unexpectedly received an invalid entity string {:?}",
e
);
return ptr::null_mut();
}
},
}
};
let timeout_duration = Duration::from_secs_f32(c_timeout);
let (server, channel) = match runtime.block_on(async move {
let channel = match (r#type, payload) {
(Some(t), Some(p)) => {
timeout(
timeout_duration,
dial_with_cred(
uri_str,
entity_opt,
t.to_str()?,
p.to_str()?,
allow_insec,
disable_webrtc,
opts,
)?
.connect(),
)
.await?
}
(None, None) => {
timeout(
timeout_duration,
dial_without_cred(uri_str, allow_insec, disable_webrtc, opts)?.connect(),
)
.await?
}
(None, Some(_)) => Err(anyhow::anyhow!("Error missing credential: type")),
(Some(_), None) => Err(anyhow::anyhow!("Error missing credential: payload")),
}?;
let dial = channel.clone();
let g = GRPCProxy::new(dial, uri);
let service = ServiceBuilder::new()
.layer(
TraceLayer::new_for_http()
.make_span_with(DefaultMakeSpan::new().include_headers(true))
.on_request(DefaultOnRequest::new().level(Level::INFO))
.on_response(
DefaultOnResponse::new()
.level(Level::INFO)
.latency_unit(LatencyUnit::Micros),
),
)
.service(g);
let server = Server::builder(conn)
.http2_only(true)
.serve(Shared::new(service));
Ok::<_, Box<dyn std::error::Error>>((server, channel))
}) {
Ok(s) => s,
Err(e) => {
log::error!("Error building GRPC proxy reason : {}", e);
return ptr::null_mut();
}
};
ctx.channels.push(channel);
let server = server.with_graceful_shutdown(async {
rx.await.ok();
});
let _ = runtime.spawn(async {
let _ = server.await;
});
ctx.push_signal(tx);
path.into_raw()
}
#[no_mangle]
#[deprecated(note = "please use viam_dial_with_opts instead")]
pub unsafe extern "C" fn viam_dial(
c_uri: *const c_char,
c_entity: *const c_char,
c_type: *const c_char,
c_payload: *const c_char,
c_allow_insec: bool,
c_timeout: f32,
rt_ptr: Option<&mut DialFfi>,
) -> *mut c_char {
dial_impl(
c_uri,
c_entity,
c_type,
c_payload,
c_allow_insec,
c_timeout,
rt_ptr,
&DialOpts::default(),
)
}
#[no_mangle]
pub unsafe extern "C" fn viam_dial_with_opts(
c_uri: *const c_char,
c_entity: *const c_char,
c_type: *const c_char,
c_payload: *const c_char,
c_allow_insec: bool,
c_timeout: f32,
rt_ptr: Option<&mut DialFfi>,
opts: *const c_void,
) -> *mut c_char {
let default_opts;
let opts_ref: &DialOpts = if opts.is_null() {
default_opts = DialOpts::default();
&default_opts
} else {
&*(opts as *const DialOpts)
};
dial_impl(
c_uri,
c_entity,
c_type,
c_payload,
c_allow_insec,
c_timeout,
rt_ptr,
opts_ref,
)
}
#[no_mangle]
#[deprecated]
pub unsafe extern "C" fn dial(
c_uri: *const c_char,
c_entity: *const c_char,
c_type: *const c_char,
c_payload: *const c_char,
c_allow_insec: bool,
c_timeout: f32,
rt_ptr: Option<&mut DialFfi>,
) -> *mut c_char {
dial_impl(
c_uri,
c_entity,
c_type,
c_payload,
c_allow_insec,
c_timeout,
rt_ptr,
&DialOpts::default(),
)
}
#[no_mangle]
pub unsafe extern "C" fn viam_free_string(s: *mut c_char) {
if s.is_null() {
return;
}
log::debug!("freeing string: {s:?}");
let _ = CString::from_raw(s);
}
#[no_mangle]
#[deprecated]
pub unsafe extern "C" fn free_string(s: *mut c_char) {
viam_free_string(s)
}
#[no_mangle]
pub extern "C" fn viam_free_rust_runtime(rt_ptr: Option<Box<DialFfi>>) -> i32 {
let mut ctx = match rt_ptr {
Some(ctx) => ctx,
None => {
return -1;
}
};
if let Some(sigs) = ctx.sigs.take() {
for sig in sigs {
let _ = sig.send(());
}
}
for channel in &ctx.channels {
match channel {
ViamChannel::Direct(_) => (),
ViamChannel::DirectPreAuthorized(_) => (),
ViamChannel::WebRTC(chan) => ctx
.runtime
.as_ref()
.map(|rt| rt.block_on(async move { chan.close().await }))
.unwrap_or_default(),
}
}
log::debug!("Freeing rust runtime");
0
}
#[no_mangle]
#[deprecated]
pub extern "C" fn free_rust_runtime(rt_ptr: Option<Box<DialFfi>>) -> i32 {
viam_free_rust_runtime(rt_ptr)
}