#![allow(clippy::unnecessary_cast, reason = "platform specific")]
use std::cell::RefCell;
use std::cell::UnsafeCell;
use std::collections::HashMap;
use std::ffi::c_void;
use std::rc::Rc;
use deno_core::OpState;
use deno_core::cppgc;
use deno_core::op2;
use deno_core::serde_v8;
use deno_core::v8;
use libnghttp2 as ffi;
use serde::Serialize;
type CSsizeT = ffi::nghttp2_ssize;
use super::stream::Http2Headers;
use super::stream::Http2Priority;
use super::stream::Http2Stream;
use super::types::*;
const SESSION_STATE_LEN: usize = SessionStateIndex::Count as usize;
const STREAM_STATE_LEN: usize = StreamStateIndex::Count as usize;
const OPTIONS_LEN: usize = OptionsIndex::Flags as usize + 1;
const SETTINGS_LEN: usize =
SettingsIndex::Count as usize + 1 + 1 + (2 * MAX_ADDITIONAL_SETTINGS);
thread_local! {
static SESSION_STATE: UnsafeCell<[f32; SESSION_STATE_LEN]> =
const { UnsafeCell::new([0.0; SESSION_STATE_LEN]) };
static STREAM_STATE: UnsafeCell<[f32; STREAM_STATE_LEN]> =
const { UnsafeCell::new([0.0; STREAM_STATE_LEN]) };
static OPTIONS: UnsafeCell<[u32; OPTIONS_LEN]> =
const { UnsafeCell::new([0; OPTIONS_LEN]) };
static SETTINGS: UnsafeCell<[u32; SETTINGS_LEN]> =
const { UnsafeCell::new([0; SETTINGS_LEN]) };
}
#[derive(Serialize)]
#[serde(rename_all = "camelCase")]
pub struct JSHttp2State<'a> {
session_state: serde_v8::Value<'a>,
stream_state: serde_v8::Value<'a>,
options_buffer: serde_v8::Value<'a>,
settings_buffer: serde_v8::Value<'a>,
}
impl<'a> JSHttp2State<'a> {
pub fn create(scope: &mut v8::PinScope<'a, 'a>) -> Self {
let session_state = SESSION_STATE.with(|cell| {
let ptr = unsafe { (*cell.get()).as_mut_ptr() };
create_f32_array(scope, ptr, SESSION_STATE_LEN)
});
let stream_state = STREAM_STATE.with(|cell| {
let ptr = unsafe { (*cell.get()).as_mut_ptr() };
create_f32_array(scope, ptr, STREAM_STATE_LEN)
});
let options_buffer = OPTIONS.with(|cell| {
let ptr = unsafe { (*cell.get()).as_mut_ptr() };
create_u32_array(scope, ptr, OPTIONS_LEN)
});
let settings_buffer = SETTINGS.with(|cell| {
let ptr = unsafe { (*cell.get()).as_mut_ptr() };
create_u32_array(scope, ptr, SETTINGS_LEN)
});
Self {
session_state,
stream_state,
options_buffer,
settings_buffer,
}
}
}
fn create_f32_array<'a>(
scope: &mut v8::PinScope<'a, 'a>,
buffer: *mut f32,
len: usize,
) -> serde_v8::Value<'a> {
unsafe {
let bs = v8::ArrayBuffer::new_backing_store_from_ptr(
buffer as *mut c_void,
len * std::mem::size_of::<f32>(),
nop_deleter,
std::ptr::null_mut(),
);
let ab = v8::ArrayBuffer::with_backing_store(scope, &bs.make_shared());
v8::Float32Array::new(scope, ab, 0, len).unwrap().into()
}
}
fn create_u32_array<'a>(
scope: &mut v8::PinScope<'a, 'a>,
buffer: *mut u32,
len: usize,
) -> serde_v8::Value<'a> {
unsafe {
let bs = v8::ArrayBuffer::new_backing_store_from_ptr(
buffer as *mut c_void,
len * std::mem::size_of::<u32>(),
nop_deleter,
std::ptr::null_mut(),
);
let ab = v8::ArrayBuffer::with_backing_store(scope, &bs.make_shared());
v8::Uint32Array::new(scope, ab, 0, len).unwrap().into()
}
}
unsafe extern "C" fn nop_deleter(
_data: *mut c_void,
_byte_length: usize,
_deleter_data: *mut c_void,
) {
}
fn with_settings<F, R>(f: F) -> R
where
F: FnOnce(&mut [u32; SETTINGS_LEN]) -> R,
{
SETTINGS.with(|cell| {
let buffer = unsafe { &mut *cell.get() };
f(buffer)
})
}
fn with_options<F, R>(f: F) -> R
where
F: FnOnce(&[u32; OPTIONS_LEN]) -> R,
{
OPTIONS.with(|cell| {
let buffer = unsafe { &*cell.get() };
f(buffer)
})
}
#[repr(C)]
struct H2WriteReq {
uv_req: deno_core::uv_compat::UvWrite,
data: Vec<u8>,
}
unsafe extern "C" fn h2_write_cb(
req: *mut deno_core::uv_compat::UvWrite,
_status: i32,
) {
let stream_handle = unsafe { (*req).handle };
if !stream_handle.is_null() {
let session_ptr =
unsafe { (*(stream_handle as *mut deno_core::uv_compat::UvHandle)).data };
if !session_ptr.is_null() {
let session = unsafe { &mut *(session_ptr as *mut Session) };
session.maybe_notify_graceful_close_complete();
}
}
let _ = unsafe { Box::from_raw(req as *mut H2WriteReq) };
}
unsafe extern "C" fn h2_stream_close_cb(
handle: *mut deno_core::uv_compat::UvHandle,
) {
let _ = unsafe { Box::from_raw(handle as *mut deno_core::uv_compat::UvTcp) };
}
unsafe extern "C" fn h2_shutdown_cb(
req: *mut deno_core::uv_compat::UvShutdown,
_status: i32,
) {
let stream_handle = unsafe { (*req).handle };
if !stream_handle.is_null() {
unsafe {
deno_core::uv_compat::uv_close(
stream_handle as *mut deno_core::uv_compat::UvHandle,
Some(h2_stream_close_cb),
);
}
}
let _ = unsafe { Box::from_raw(req) };
}
unsafe extern "C" fn h2_alloc_cb(
_handle: *mut deno_core::uv_compat::UvHandle,
suggested_size: usize,
buf: *mut deno_core::uv_compat::UvBuf,
) {
let data = vec![0u8; suggested_size];
let leaked = Box::into_raw(data.into_boxed_slice());
unsafe {
(*buf).base = (*leaked).as_mut_ptr() as *mut _;
(*buf).len = suggested_size;
}
}
unsafe extern "C" fn h2_read_cb(
handle: *mut deno_core::uv_compat::UvStream,
nread: isize,
buf: *const deno_core::uv_compat::UvBuf,
) {
let session_ptr = unsafe { (*handle).data as *mut Session };
if session_ptr.is_null() {
if !buf.is_null() && !unsafe { (*buf).base.is_null() } {
let _ = unsafe {
Box::from_raw(std::ptr::slice_from_raw_parts_mut(
(*buf).base as *mut u8,
(*buf).len,
))
};
}
return;
}
let session = unsafe { &mut *session_ptr };
if nread < 0 {
unsafe {
deno_core::uv_compat::uv_read_stop(handle);
}
unsafe {
ffi::nghttp2_session_terminate_session(
session.session,
ffi::NGHTTP2_CONNECT_ERROR as u32,
);
}
session.send_pending_data();
{
let mut isolate =
unsafe { v8::Isolate::from_raw_isolate_ptr(session.isolate) };
v8::scope!(let scope, &mut isolate);
let context = v8::Local::new(scope, session.context.clone());
let scope = &mut v8::ContextScope::new(scope, context);
let this_local = v8::Local::new(scope, &session.this);
let key = v8::String::new(scope, "onstreamclose").unwrap();
if let Some(Ok(cb)) = this_local
.get(scope, key.into())
.map(v8::Local::<v8::Function>::try_from)
{
cb.call(scope, this_local.into(), &[]);
}
}
if !buf.is_null() && !unsafe { (*buf).base.is_null() } {
let _ = unsafe {
Box::from_raw(std::ptr::slice_from_raw_parts_mut(
(*buf).base as *mut u8,
(*buf).len,
))
};
}
return;
}
if nread > 0 {
let data = unsafe {
std::slice::from_raw_parts((*buf).base as *const u8, nread as usize)
};
session.receive_data(data);
}
if !buf.is_null() && !unsafe { (*buf).base.is_null() } {
let _ = unsafe {
Box::from_raw(std::ptr::slice_from_raw_parts_mut(
(*buf).base as *mut u8,
(*buf).len,
))
};
}
}
const OPTIONS_FLAG_NO_AUTO_WINDOW_UPDATE: u32 = 0x1;
const OPTIONS_FLAG_NO_RECV_CLIENT_MAGIC: u32 = 0x2;
const OPTIONS_FLAG_NO_HTTP_MESSAGING: u32 = 0x4;
struct Http2Options {
options: *mut ffi::nghttp2_option,
padding_strategy: PaddingStrategy,
}
impl Http2Options {
fn new(session_type: SessionType) -> Self {
let mut options: *mut ffi::nghttp2_option = std::ptr::null_mut();
unsafe { ffi::nghttp2_option_new(&mut options) };
let padding_strategy = with_options(|buffer| {
let flags = buffer[OptionsIndex::Flags as usize];
unsafe {
ffi::nghttp2_option_set_no_closed_streams(options, 1);
if flags & OPTIONS_FLAG_NO_AUTO_WINDOW_UPDATE != 0 {
ffi::nghttp2_option_set_no_auto_window_update(options, 1);
}
if flags & OPTIONS_FLAG_NO_RECV_CLIENT_MAGIC != 0 {
ffi::nghttp2_option_set_no_recv_client_magic(options, 1);
}
if flags & OPTIONS_FLAG_NO_HTTP_MESSAGING != 0 {
ffi::nghttp2_option_set_no_http_messaging(options, 1);
}
let max_deflate =
buffer[OptionsIndex::MaxDeflateDynamicTableSize as usize];
if max_deflate > 0 {
ffi::nghttp2_option_set_max_deflate_dynamic_table_size(
options,
max_deflate as usize,
);
}
let max_reserved =
buffer[OptionsIndex::MaxReservedRemoteStreams as usize];
if max_reserved > 0 {
ffi::nghttp2_option_set_max_reserved_remote_streams(
options,
max_reserved,
);
}
let max_send_header =
buffer[OptionsIndex::MaxSendHeaderBlockLength as usize];
if max_send_header > 0 {
ffi::nghttp2_option_set_max_send_header_block_length(
options,
max_send_header as usize,
);
}
let peer_max_concurrent =
buffer[OptionsIndex::PeerMaxConcurrentStreams as usize];
if peer_max_concurrent > 0 {
ffi::nghttp2_option_set_peer_max_concurrent_streams(
options,
peer_max_concurrent,
);
} else {
ffi::nghttp2_option_set_peer_max_concurrent_streams(options, 100);
}
let max_outstanding_pings =
buffer[OptionsIndex::MaxOutstandingPings as usize];
if max_outstanding_pings > 0 {
ffi::nghttp2_option_set_max_outbound_ack(
options,
max_outstanding_pings as usize,
);
}
let max_outstanding_settings =
buffer[OptionsIndex::MaxOutstandingSettings as usize];
if max_outstanding_settings > 0 {
ffi::nghttp2_option_set_max_settings(
options,
max_outstanding_settings as usize,
);
}
if matches!(session_type, SessionType::Client) {
ffi::nghttp2_option_set_builtin_recv_extension_type(
options,
ffi::NGHTTP2_ALTSVC as u8,
);
ffi::nghttp2_option_set_builtin_recv_extension_type(
options,
ffi::NGHTTP2_ORIGIN as u8,
);
}
}
let padding = buffer[OptionsIndex::PaddingStrategy as usize];
match padding {
1 => PaddingStrategy::Aligned,
2 => PaddingStrategy::Max,
3 => PaddingStrategy::Callback,
_ => PaddingStrategy::None,
}
});
Self {
options,
padding_strategy,
}
}
fn ptr(&self) -> *mut ffi::nghttp2_option {
self.options
}
fn padding_strategy(&self) -> PaddingStrategy {
self.padding_strategy
}
}
impl Drop for Http2Options {
fn drop(&mut self) {
if !self.options.is_null() {
unsafe { ffi::nghttp2_option_del(self.options) };
}
}
}
const SETTINGS_ENTRY_COUNT: usize =
SettingsIndex::Count as usize + MAX_ADDITIONAL_SETTINGS;
struct Http2Settings {
entries: [ffi::nghttp2_settings_entry; SETTINGS_ENTRY_COUNT],
count: usize,
session: *mut Session,
}
impl Http2Settings {
fn init(session: *mut Session) -> Self {
with_settings(|buffer| {
let flags = buffer[SettingsIndex::Count as usize];
let mut count: usize = 0;
let mut entries = [ffi::nghttp2_settings_entry {
settings_id: 0,
value: 0,
}; SETTINGS_ENTRY_COUNT];
macro_rules! grab_setting {
($index:expr, $nghttp2_id:expr) => {
if flags & (1 << $index as u8) != 0 {
let val = buffer[$index as usize];
if count < entries.len() {
entries[count] = ffi::nghttp2_settings_entry {
settings_id: $nghttp2_id as _,
value: val,
};
count += 1;
}
}
};
}
grab_setting!(
SettingsIndex::HeaderTableSize,
ffi::NGHTTP2_SETTINGS_HEADER_TABLE_SIZE
);
grab_setting!(
SettingsIndex::EnablePush,
ffi::NGHTTP2_SETTINGS_ENABLE_PUSH
);
grab_setting!(
SettingsIndex::InitialWindowSize,
ffi::NGHTTP2_SETTINGS_INITIAL_WINDOW_SIZE
);
grab_setting!(
SettingsIndex::MaxFrameSize,
ffi::NGHTTP2_SETTINGS_MAX_FRAME_SIZE
);
grab_setting!(
SettingsIndex::MaxConcurrentStreams,
ffi::NGHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS
);
grab_setting!(
SettingsIndex::MaxHeaderListSize,
ffi::NGHTTP2_SETTINGS_MAX_HEADER_LIST_SIZE
);
let num_add_settings = buffer[SettingsIndex::Count as usize + 1] as usize;
if num_add_settings > 0 {
let offset = SettingsIndex::Count as usize + 2;
for i in 0..num_add_settings {
let key = buffer[offset + i * 2];
let val = buffer[offset + i * 2 + 1];
if count < entries.len() {
entries[count] = ffi::nghttp2_settings_entry {
settings_id: key as i32,
value: val,
};
count += 1;
}
}
}
Self {
session,
entries,
count,
}
})
}
fn send(&self) {
unsafe {
let session = &*self.session;
ffi::nghttp2_submit_settings(
session.session,
ffi::NGHTTP2_FLAG_NONE as _,
self.entries.as_ptr(),
self.count,
);
}
}
}
fn frame_id(frame: *const ffi::nghttp2_frame) -> i32 {
unsafe {
let frame = &*frame;
if frame.hd.type_ as u32 == ffi::NGHTTP2_PUSH_PROMISE as u32 {
frame.push_promise.promised_stream_id
} else {
frame.hd.stream_id
}
}
}
fn frame_type(frame: *const ffi::nghttp2_frame) -> u8 {
unsafe { (*frame).hd.type_ }
}
fn frame_flags(frame: *const ffi::nghttp2_frame) -> u8 {
unsafe { (*frame).hd.flags }
}
fn frame_headers_category(
frame: *const ffi::nghttp2_frame,
) -> ffi::nghttp2_headers_category {
unsafe { (*frame).headers.cat }
}
fn rcbuf_to_slice(rcbuf: *mut ffi::nghttp2_rcbuf) -> &'static [u8] {
unsafe {
let buf = ffi::nghttp2_rcbuf_get_buf(rcbuf);
std::slice::from_raw_parts(buf.base, buf.len)
}
}
fn frame_header_length(frame: *const ffi::nghttp2_frame) -> usize {
unsafe { (*frame).hd.length }
}
unsafe extern "C" fn on_begin_headers_callbacks(
ng_session: *mut ffi::nghttp2_session,
frame: *const ffi::nghttp2_frame,
data: *mut c_void,
) -> i32 {
let session = unsafe { Session::from_user_data(data) };
let id = frame_id(frame);
let cat = frame_headers_category(frame);
match session.find_stream(id) {
None => {
if session.is_graceful_closing() {
unsafe {
ffi::nghttp2_submit_rst_stream(
ng_session,
ffi::NGHTTP2_FLAG_NONE as u8,
id,
ffi::NGHTTP2_REFUSED_STREAM as u32,
);
}
return ffi::NGHTTP2_ERR_TEMPORAL_CALLBACK_FAILURE as i32;
}
let (obj, stream) = Http2Stream::new(session, id, cat);
stream.start_headers(cat);
session.streams.insert(id, (obj, stream));
}
Some(s) => {
s.start_headers(cat);
}
}
0
}
unsafe extern "C" fn on_header_callback(
_session: *mut ffi::nghttp2_session,
frame: *const ffi::nghttp2_frame,
name: *mut ffi::nghttp2_rcbuf,
value: *mut ffi::nghttp2_rcbuf,
flags: u8,
data: *mut c_void,
) -> i32 {
let session = unsafe { Session::from_user_data(data) };
let id = frame_id(frame);
if let Some(stream) = session.find_stream(id) {
let name_slice = rcbuf_to_slice(name);
let value_slice = rcbuf_to_slice(value);
if !stream.add_header(name_slice, value_slice, flags) {
unsafe {
ffi::nghttp2_submit_rst_stream(
session.session,
ffi::NGHTTP2_FLAG_NONE as u8,
id,
ffi::NGHTTP2_ENHANCE_YOUR_CALM as u32,
);
}
return ffi::NGHTTP2_ERR_TEMPORAL_CALLBACK_FAILURE as i32;
}
}
0
}
unsafe extern "C" fn on_frame_recv_callback(
_session: *mut ffi::nghttp2_session,
frame: *const ffi::nghttp2_frame,
data: *mut c_void,
) -> i32 {
let session = unsafe { Session::from_user_data(data) };
let ft = frame_type(frame) as u32;
let ff = frame_flags(frame);
#[allow(clippy::unnecessary_cast, reason = "cast needed for type alignment")]
if ft == ffi::NGHTTP2_DATA as u32 {
if ff & ffi::NGHTTP2_FLAG_END_STREAM as u8 != 0 {
handle_data_end_stream(session, frame);
}
} else if ft == ffi::NGHTTP2_PUSH_PROMISE as u32
|| ft == ffi::NGHTTP2_HEADERS as u32
{
handle_headers_frame(session, frame);
if ff & ffi::NGHTTP2_FLAG_END_STREAM as u8 != 0 {
handle_data_end_stream(session, frame);
}
} else if ft == ffi::NGHTTP2_SETTINGS as u32 {
if ff & ffi::NGHTTP2_FLAG_ACK as u8 == 0 {
handle_settings_frame(session);
}
} else if ft == ffi::NGHTTP2_PRIORITY as u32 {
handle_priority_frame(session, frame);
} else if ft == ffi::NGHTTP2_GOAWAY as u32 {
handle_goaway_frame(session, frame);
} else if ft == ffi::NGHTTP2_PING as u32 {
handle_ping_frame(session);
} else if ft == ffi::NGHTTP2_ALTSVC as u32 {
handle_alt_svc_frame(session, frame);
} else if ft == ffi::NGHTTP2_ORIGIN as u32 {
handle_origin_frame(session, frame);
}
0
}
fn handle_data_end_stream(session: &Session, frame: *const ffi::nghttp2_frame) {
let id = frame_id(frame);
let Some(stream_obj) = session.find_stream_obj(id) else {
return;
};
let mut isolate =
unsafe { v8::Isolate::from_raw_isolate_ptr(session.isolate) };
v8::scope!(let scope, &mut isolate);
let context = v8::Local::new(scope, session.context.clone());
let scope = &mut v8::ContextScope::new(scope, context);
let handle = v8::Local::new(scope, stream_obj);
let onread_key = v8::String::new(scope, "onread").unwrap();
let Some(onread_val) = handle.get(scope, onread_key.into()) else {
return;
};
let Ok(onread_fn) = v8::Local::<v8::Function>::try_from(onread_val) else {
return;
};
let eof = v8::Number::new(scope, -4095.0);
let undef = v8::undefined(scope);
onread_fn.call(scope, handle.into(), &[undef.into(), eof.into()]);
}
fn handle_headers_frame(session: &Session, frame: *const ffi::nghttp2_frame) {
let mut isolate =
unsafe { v8::Isolate::from_raw_isolate_ptr(session.isolate) };
v8::scope!(let scope, &mut isolate);
let context = v8::Local::new(scope, session.context.clone());
let scope = &mut v8::ContextScope::new(scope, context);
let id = frame_id(frame);
let Some(stream_ref) = session.find_stream(id) else {
return;
};
let headers = stream_ref.current_headers.borrow();
if headers.is_empty() {
return;
}
let headers_array = v8::Array::new(scope, (headers.len() * 2) as i32);
for (i, (name, value, _flags)) in headers.iter().enumerate() {
let name_str = v8::String::new(scope, name).unwrap();
let value_str = v8::String::new(scope, value).unwrap();
headers_array.set_index(scope, (i * 2) as u32, name_str.into());
headers_array.set_index(scope, (i * 2 + 1) as u32, value_str.into());
}
drop(headers);
stream_ref.clear_headers();
let stream_obj = session.find_stream_obj(id).unwrap();
let state = session.op_state.borrow();
let callbacks = state.borrow::<SessionCallbacks>();
let recv = v8::Local::new(scope, &session.this);
let callback = v8::Local::new(scope, &callbacks.headers_frame_cb);
drop(state);
let handle = v8::Local::new(scope, stream_obj);
let id_num = v8::Number::new(scope, id.into());
let cat = v8::null(scope);
let flags = v8::Number::new(scope, frame_flags(frame).into());
callback.call(
scope,
recv.into(),
&[
handle.into(),
id_num.into(),
cat.into(),
flags.into(),
headers_array.into(),
],
);
}
fn handle_settings_frame(session: &Session) {
let mut isolate =
unsafe { v8::Isolate::from_raw_isolate_ptr(session.isolate) };
v8::scope!(let scope, &mut isolate);
let context = v8::Local::new(scope, session.context.clone());
let scope = &mut v8::ContextScope::new(scope, context);
let state = session.op_state.borrow();
let callbacks = state.borrow::<SessionCallbacks>();
let recv = v8::Local::new(scope, &session.this);
let callback = v8::Local::new(scope, &callbacks.settings_frame_cb);
drop(state);
callback.call(scope, recv.into(), &[]);
}
fn handle_ping_frame(session: &Session) {
let mut isolate =
unsafe { v8::Isolate::from_raw_isolate_ptr(session.isolate) };
v8::scope!(let scope, &mut isolate);
let context = v8::Local::new(scope, session.context.clone());
let scope = &mut v8::ContextScope::new(scope, context);
let state = session.op_state.borrow();
let callbacks = state.borrow::<SessionCallbacks>();
let recv = v8::Local::new(scope, &session.this);
let callback = v8::Local::new(scope, &callbacks.ping_frame_cb);
drop(state);
let arg = v8::null(scope);
callback.call(scope, recv.into(), &[arg.into()]);
}
fn handle_goaway_frame(session: &Session, frame: *const ffi::nghttp2_frame) {
let mut isolate =
unsafe { v8::Isolate::from_raw_isolate_ptr(session.isolate) };
v8::scope!(let scope, &mut isolate);
let context = v8::Local::new(scope, session.context.clone());
let scope = &mut v8::ContextScope::new(scope, context);
let goaway_frame = unsafe { (*frame).goaway };
let error_code = v8::Number::new(scope, goaway_frame.error_code.into());
let last_stream_id =
v8::Number::new(scope, goaway_frame.last_stream_id.into());
let opaque_data: v8::Local<v8::Value> = if goaway_frame.opaque_data_len > 0 {
let data_slice = unsafe {
std::slice::from_raw_parts(
goaway_frame.opaque_data,
goaway_frame.opaque_data_len,
)
};
let array_buffer = v8::ArrayBuffer::new(scope, data_slice.len());
let backing_store = array_buffer.get_backing_store();
if let Some(backing_data) = backing_store.data() {
unsafe {
std::ptr::copy_nonoverlapping(
data_slice.as_ptr(),
backing_data.as_ptr() as *mut u8,
data_slice.len(),
);
}
}
v8::Uint8Array::new(scope, array_buffer, 0, data_slice.len())
.unwrap()
.into()
} else {
v8::undefined(scope).into()
};
let state = session.op_state.borrow();
let callbacks = state.borrow::<SessionCallbacks>();
let recv = v8::Local::new(scope, &session.this);
let callback = v8::Local::new(scope, &callbacks.goaway_data_cb);
drop(state);
callback.call(
scope,
recv.into(),
&[error_code.into(), last_stream_id.into(), opaque_data],
);
}
fn handle_priority_frame(session: &Session, frame: *const ffi::nghttp2_frame) {
let mut isolate =
unsafe { v8::Isolate::from_raw_isolate_ptr(session.isolate) };
v8::scope!(let scope, &mut isolate);
let context = v8::Local::new(scope, session.context.clone());
let scope = &mut v8::ContextScope::new(scope, context);
let priority_frame = unsafe { (*frame).priority };
let id = frame_id(frame);
let spec = priority_frame.pri_spec;
let stream_id = v8::Number::new(scope, id.into());
let parent_stream_id = v8::Number::new(scope, spec.stream_id.into());
let weight = v8::Number::new(scope, spec.weight.into());
let exclusive = v8::Boolean::new(scope, spec.exclusive != 0);
let state = session.op_state.borrow();
let callbacks = state.borrow::<SessionCallbacks>();
let recv = v8::Local::new(scope, &session.this);
let callback = v8::Local::new(scope, &callbacks.priority_frame_cb);
drop(state);
callback.call(
scope,
recv.into(),
&[
stream_id.into(),
parent_stream_id.into(),
weight.into(),
exclusive.into(),
],
);
}
fn handle_alt_svc_frame(session: &Session, frame: *const ffi::nghttp2_frame) {
let mut isolate =
unsafe { v8::Isolate::from_raw_isolate_ptr(session.isolate) };
v8::scope!(let scope, &mut isolate);
let context = v8::Local::new(scope, session.context.clone());
let scope = &mut v8::ContextScope::new(scope, context);
let id = frame_id(frame);
let ext = unsafe { (*frame).ext };
let altsvc = ext.payload as *const ffi::nghttp2_ext_altsvc;
let origin_slice = unsafe {
std::slice::from_raw_parts((*altsvc).origin, (*altsvc).origin_len)
};
let field_value_slice = unsafe {
std::slice::from_raw_parts((*altsvc).field_value, (*altsvc).field_value_len)
};
let origin_str = std::str::from_utf8(origin_slice)
.map(|s| v8::String::new(scope, s).unwrap())
.unwrap_or_else(|_| v8::String::new(scope, "").unwrap());
let field_value_str = std::str::from_utf8(field_value_slice)
.map(|s| v8::String::new(scope, s).unwrap())
.unwrap_or_else(|_| v8::String::new(scope, "").unwrap());
let state = session.op_state.borrow();
let callbacks = state.borrow::<SessionCallbacks>();
let recv = v8::Local::new(scope, &session.this);
let callback = v8::Local::new(scope, &callbacks.alt_svc_cb);
drop(state);
let stream_id = v8::Number::new(scope, id.into());
callback.call(
scope,
recv.into(),
&[stream_id.into(), origin_str.into(), field_value_str.into()],
);
}
fn handle_origin_frame(session: &Session, frame: *const ffi::nghttp2_frame) {
let mut isolate =
unsafe { v8::Isolate::from_raw_isolate_ptr(session.isolate) };
v8::scope!(let scope, &mut isolate);
let context = v8::Local::new(scope, session.context.clone());
let scope = &mut v8::ContextScope::new(scope, context);
let ext = unsafe { (*frame).ext };
let origin = ext.payload as *const ffi::nghttp2_ext_origin;
let nov = unsafe { (*origin).nov };
let origins_ptr = unsafe { (*origin).ov };
if nov == 0 {
return;
}
let origins_array = v8::Array::new(scope, nov as i32);
for i in 0..nov {
let entry = unsafe { *origins_ptr.add(i) };
let origin_slice =
unsafe { std::slice::from_raw_parts(entry.origin, entry.origin_len) };
if let Ok(origin_str) = std::str::from_utf8(origin_slice) {
let js_string = v8::String::new(scope, origin_str).unwrap();
origins_array.set_index(scope, i as u32, js_string.into());
}
}
let state = session.op_state.borrow();
let callbacks = state.borrow::<SessionCallbacks>();
let recv = v8::Local::new(scope, &session.this);
let callback = v8::Local::new(scope, &callbacks.origin_frame_cb);
drop(state);
callback.call(scope, recv.into(), &[origins_array.into()]);
}
unsafe extern "C" fn on_stream_close_callback(
_session: *mut ffi::nghttp2_session,
stream_id: i32,
error_code: u32,
data: *mut c_void,
) -> i32 {
let session = unsafe { Session::from_user_data(data) };
let Some(stream_obj) = session.find_stream_obj(stream_id).cloned() else {
return 0;
};
if let Some(stream) = session.find_stream(stream_id) {
*stream.closed_by_nghttp2.borrow_mut() = true;
}
let mut isolate =
unsafe { v8::Isolate::from_raw_isolate_ptr(session.isolate) };
v8::scope!(let scope, &mut isolate);
let context = v8::Local::new(scope, session.context.clone());
let scope = &mut v8::ContextScope::new(scope, context);
let state = session.op_state.borrow();
let callbacks = state.borrow::<SessionCallbacks>();
let callback = v8::Local::new(scope, &callbacks.stream_close_cb);
drop(state);
let recv = v8::Local::new(scope, stream_obj);
let code = v8::Integer::new_from_unsigned(scope, error_code);
let result = callback.call(scope, recv.into(), &[code.into()]);
if result.is_none() || result.map(|v| v.is_false()).unwrap_or(false) {
session.streams.remove(&stream_id);
}
0
}
unsafe extern "C" fn on_data_chunk_recv_callback(
ng_session: *mut ffi::nghttp2_session,
_flags: u8,
stream_id: i32,
data_ptr: *const u8,
len: usize,
user_data: *mut c_void,
) -> i32 {
if len == 0 {
return 0;
}
unsafe {
ffi::nghttp2_session_consume_connection(ng_session, len);
ffi::nghttp2_session_consume_stream(ng_session, stream_id, len);
};
let session = unsafe { Session::from_user_data(user_data) };
let Some(stream_obj) = session.find_stream_obj(stream_id) else {
return 0;
};
let mut isolate =
unsafe { v8::Isolate::from_raw_isolate_ptr(session.isolate) };
v8::scope!(let scope, &mut isolate);
let context = v8::Local::new(scope, session.context.clone());
let scope = &mut v8::ContextScope::new(scope, context);
let handle = v8::Local::new(scope, stream_obj);
let onread_key = v8::String::new(scope, "onread").unwrap();
let Some(onread_val) = handle.get(scope, onread_key.into()) else {
return 0;
};
let Ok(onread_fn) = v8::Local::<v8::Function>::try_from(onread_val) else {
return 0;
};
let data_slice = unsafe { std::slice::from_raw_parts(data_ptr, len) };
let ab = v8::ArrayBuffer::new(scope, len);
let backing_store = ab.get_backing_store();
let dst = backing_store.data().unwrap().as_ptr() as *mut u8;
unsafe { std::ptr::copy_nonoverlapping(data_slice.as_ptr(), dst, len) };
let uint8_array = v8::Uint8Array::new(scope, ab, 0, len).unwrap();
let nread = v8::Number::new(scope, len as f64);
onread_fn.call(scope, handle.into(), &[uint8_array.into(), nread.into()]);
0
}
pub unsafe extern "C" fn on_stream_read_callback(
_session: *mut ffi::nghttp2_session,
stream_id: i32,
buf: *mut u8,
length: usize,
data_flags: *mut u32,
_source: *mut ffi::nghttp2_data_source,
user_data: *mut c_void,
) -> CSsizeT {
let session = unsafe { Session::from_user_data(user_data) };
let mut amount: usize = 0;
let mut need_eof = false;
let mut need_trailers = false;
let mut is_deferred = false;
if let Some(stream) = session.find_stream(stream_id) {
let mut pending_data = stream.pending_data.borrow_mut();
if !pending_data.is_empty() {
let amt = std::cmp::min(pending_data.len(), length);
if amt > 0 {
let data_slice = pending_data.split_to(amt);
unsafe { std::ptr::copy_nonoverlapping(data_slice.as_ptr(), buf, amt) };
*stream.available_outbound_length.borrow_mut() -= amt;
amount = amt;
if pending_data.is_empty() && *stream.writable_ended.borrow() {
need_eof = true;
need_trailers = stream.has_trailers();
}
}
} else if *stream.writable_ended.borrow() {
need_eof = true;
need_trailers = stream.has_trailers();
} else {
is_deferred = true;
}
} else {
return ffi::NGHTTP2_ERR_DEFERRED as _;
}
if is_deferred {
return ffi::NGHTTP2_ERR_DEFERRED as _;
}
if need_eof {
unsafe { *data_flags |= ffi::NGHTTP2_DATA_FLAG_EOF as u32 };
if need_trailers {
unsafe { *data_flags |= ffi::NGHTTP2_DATA_FLAG_NO_END_STREAM as u32 };
if let Some(stream) = session.find_stream(stream_id) {
stream.on_trailers();
}
}
if let Some(stream) = session.find_stream(stream_id) {
stream.complete_shutdown();
}
}
amount as CSsizeT
}
unsafe extern "C" fn on_select_padding(
_session: *mut ffi::nghttp2_session,
frame: *const ffi::nghttp2_frame,
max_payload_len: usize,
user_data: *mut c_void,
) -> CSsizeT {
let session = unsafe { Session::from_user_data(user_data) };
let padding = frame_header_length(frame);
let result = match session.padding_strategy {
PaddingStrategy::None => padding,
PaddingStrategy::Max => {
session.on_max_frame_size_padding(padding, max_payload_len)
}
PaddingStrategy::Aligned | PaddingStrategy::Callback => {
session.on_dword_aligned_padding(padding, max_payload_len)
}
};
result as CSsizeT
}
unsafe extern "C" fn on_frame_not_send_callback(
_session: *mut ffi::nghttp2_session,
_frame: *const ffi::nghttp2_frame,
_lib_error_code: i32,
_data: *mut c_void,
) -> i32 {
0
}
unsafe extern "C" fn on_invalid_header_callback(
_session: *mut ffi::nghttp2_session,
_frame: *const ffi::nghttp2_frame,
_name: *mut ffi::nghttp2_rcbuf,
_value: *mut ffi::nghttp2_rcbuf,
_flags: u8,
_data: *mut c_void,
) -> i32 {
0
}
unsafe extern "C" fn on_nghttp_error_callback(
_session: *mut ffi::nghttp2_session,
_lib_error_code: i32,
_msg: *const std::ffi::c_char,
_len: usize,
_data: *mut c_void,
) -> i32 {
0
}
unsafe extern "C" fn on_send_data_callback(
_session: *mut ffi::nghttp2_session,
_frame: *mut ffi::nghttp2_frame,
_framehd: *const u8,
_length: usize,
_source: *mut ffi::nghttp2_data_source,
_data: *mut c_void,
) -> i32 {
0
}
unsafe extern "C" fn on_invalid_frame_recv_callback(
_session: *mut ffi::nghttp2_session,
_frame: *const ffi::nghttp2_frame,
_lib_error_code: i32,
_data: *mut c_void,
) -> i32 {
0
}
unsafe extern "C" fn on_frame_send_callback(
_session: *mut ffi::nghttp2_session,
_frame: *const ffi::nghttp2_frame,
_data: *mut c_void,
) -> i32 {
0
}
fn create_callbacks() -> *mut ffi::nghttp2_session_callbacks {
let mut callbacks: *mut ffi::nghttp2_session_callbacks = std::ptr::null_mut();
unsafe {
assert_eq!(ffi::nghttp2_session_callbacks_new(&mut callbacks), 0);
ffi::nghttp2_session_callbacks_set_on_begin_headers_callback(
callbacks,
Some(on_begin_headers_callbacks),
);
ffi::nghttp2_session_callbacks_set_on_header_callback2(
callbacks,
Some(on_header_callback),
);
ffi::nghttp2_session_callbacks_set_on_frame_recv_callback(
callbacks,
Some(on_frame_recv_callback),
);
ffi::nghttp2_session_callbacks_set_on_stream_close_callback(
callbacks,
Some(on_stream_close_callback),
);
ffi::nghttp2_session_callbacks_set_on_data_chunk_recv_callback(
callbacks,
Some(on_data_chunk_recv_callback),
);
ffi::nghttp2_session_callbacks_set_on_frame_not_send_callback(
callbacks,
Some(on_frame_not_send_callback),
);
ffi::nghttp2_session_callbacks_set_on_invalid_header_callback2(
callbacks,
Some(on_invalid_header_callback),
);
ffi::nghttp2_session_callbacks_set_error_callback2(
callbacks,
Some(on_nghttp_error_callback),
);
ffi::nghttp2_session_callbacks_set_send_data_callback(
callbacks,
Some(on_send_data_callback),
);
ffi::nghttp2_session_callbacks_set_on_invalid_frame_recv_callback(
callbacks,
Some(on_invalid_frame_recv_callback),
);
ffi::nghttp2_session_callbacks_set_on_frame_send_callback(
callbacks,
Some(on_frame_send_callback),
);
ffi::nghttp2_session_callbacks_set_select_padding_callback2(
callbacks,
Some(on_select_padding),
);
}
callbacks
}
#[allow(dead_code, reason = "fields are stored for prevent GC of v8 handles")]
pub struct SessionCallbacks {
pub session_internal_error_cb: v8::Global<v8::Function>,
pub priority_frame_cb: v8::Global<v8::Function>,
pub settings_frame_cb: v8::Global<v8::Function>,
pub ping_frame_cb: v8::Global<v8::Function>,
pub headers_frame_cb: v8::Global<v8::Function>,
pub frame_error_cb: v8::Global<v8::Function>,
pub goaway_data_cb: v8::Global<v8::Function>,
pub alt_svc_cb: v8::Global<v8::Function>,
pub stream_trailers_cb: v8::Global<v8::Function>,
pub stream_close_cb: v8::Global<v8::Function>,
pub origin_frame_cb: v8::Global<v8::Function>,
}
#[derive(Debug)]
pub struct NgHttp2StreamWrite {
pub data: bytes::Bytes,
#[allow(dead_code, reason = "stored for debugging")]
pub stream_id: i32,
}
impl NgHttp2StreamWrite {
pub fn new(data: bytes::Bytes, stream_id: i32) -> Self {
Self { data, stream_id }
}
pub fn len(&self) -> usize {
self.data.len()
}
}
pub struct Session {
pub session: *mut ffi::nghttp2_session,
pub streams: HashMap<i32, (v8::Global<v8::Object>, cppgc::Ref<Http2Stream>)>,
pub outgoing_buffers: Vec<NgHttp2StreamWrite>,
pub outgoing_length: usize,
pub isolate: v8::UnsafeRawIsolatePtr,
pub context: v8::Global<v8::Context>,
pub op_state: Rc<RefCell<OpState>>,
pub this: v8::Global<v8::Object>,
pub padding_strategy: PaddingStrategy,
pub graceful_close_initiated: bool,
pub stream: Option<*mut deno_core::uv_compat::UvStream>,
pub is_sending: bool,
pub pending_destroy: bool,
pub pending_rst_streams: Vec<(i32, u32)>,
}
impl Session {
pub fn find_stream(&self, id: i32) -> Option<&cppgc::Ref<Http2Stream>> {
self.streams.get(&id).map(|v| &v.1)
}
pub fn find_stream_obj(&self, id: i32) -> Option<&v8::Global<v8::Object>> {
self.streams.get(&id).map(|v| &v.0)
}
pub fn push_outgoing_buffer(&mut self, write: NgHttp2StreamWrite) {
self.outgoing_length += write.len();
self.outgoing_buffers.push(write);
}
pub fn clear_outgoing(&mut self) {
self.outgoing_buffers.clear();
self.outgoing_length = 0;
}
pub fn submit_rst_stream(&mut self, stream_id: i32, code: u32) {
if self.is_sending {
self.pending_rst_streams.push((stream_id, code));
return;
}
self.send_pending_data();
let stream_ptr =
unsafe { ffi::nghttp2_session_find_stream(self.session, stream_id) };
if stream_ptr.is_null() {
return;
}
unsafe {
ffi::nghttp2_submit_rst_stream(
self.session,
ffi::NGHTTP2_FLAG_NONE as u8,
stream_id,
code,
);
}
}
fn flush_pending_rst_streams(&mut self) {
if self.pending_rst_streams.is_empty() {
return;
}
let pending: Vec<_> = std::mem::take(&mut self.pending_rst_streams);
self.send_pending_data();
for (stream_id, code) in pending {
let stream_ptr =
unsafe { ffi::nghttp2_session_find_stream(self.session, stream_id) };
if stream_ptr.is_null() {
continue;
}
unsafe {
ffi::nghttp2_submit_rst_stream(
self.session,
ffi::NGHTTP2_FLAG_NONE as u8,
stream_id,
code,
);
}
}
}
pub fn is_graceful_closing(&self) -> bool {
self.graceful_close_initiated
}
pub fn start_graceful_close(&mut self) {
self.graceful_close_initiated = true;
}
pub fn active_stream_count(&self) -> usize {
self.streams.len()
}
pub fn maybe_notify_graceful_close_complete(&mut self) {
if !self.graceful_close_initiated {
return;
}
let want_write =
unsafe { ffi::nghttp2_session_want_write(self.session) };
let want_read =
unsafe { ffi::nghttp2_session_want_read(self.session) };
if want_write != 0 || want_read != 0 {
return;
}
let mut isolate =
unsafe { v8::Isolate::from_raw_isolate_ptr(self.isolate) };
v8::scope!(let scope, &mut isolate);
let context = v8::Local::new(scope, self.context.clone());
let scope = &mut v8::ContextScope::new(scope, context);
let this_local = v8::Local::new(scope, &self.this);
let key = v8::String::new(scope, "ongracefulclosecomplete").unwrap();
if let Some(Ok(cb)) = this_local
.get(scope, key.into())
.map(v8::Local::<v8::Function>::try_from)
{
cb.call(scope, this_local.into(), &[]);
}
}
pub unsafe fn from_user_data<'a>(user_data: *mut c_void) -> &'a mut Self {
unsafe { &mut *(user_data as *mut Session) }
}
pub fn send_pending_data(&mut self) {
if self.is_sending {
return;
}
let stream = match self.stream {
Some(stream) => stream,
None => return,
};
let handle_type =
unsafe { (*(stream as *mut deno_core::uv_compat::UvHandle)).r#type };
if handle_type != deno_core::uv_compat::uv_handle_type::UV_TCP {
self.stream = None;
return;
}
self.is_sending = true;
loop {
let mut src = std::ptr::null();
let src_len =
unsafe { ffi::nghttp2_session_mem_send(self.session, &mut src) };
if src_len > 0 {
let data = unsafe { std::slice::from_raw_parts(src, src_len as usize) };
let data_copy = data.to_vec();
let write_req = Box::new(H2WriteReq {
uv_req: deno_core::uv_compat::new_write(),
data: data_copy,
});
let write_ptr = Box::into_raw(write_req);
unsafe {
let buf = deno_core::uv_compat::UvBuf {
base: (*write_ptr).data.as_ptr() as *mut _,
len: (*write_ptr).data.len(),
};
let ret = deno_core::uv_compat::uv_write(
&mut (*write_ptr).uv_req,
stream,
&buf,
1,
Some(h2_write_cb),
);
if ret != 0 {
let _ = Box::from_raw(write_ptr);
}
}
} else {
break;
}
}
self.is_sending = false;
if !self.outgoing_buffers.is_empty() {
for buffer in &self.outgoing_buffers {
let data = buffer.data.as_ref();
let data_copy = data.to_vec();
let write_req = Box::new(H2WriteReq {
uv_req: deno_core::uv_compat::new_write(),
data: data_copy,
});
let write_ptr = Box::into_raw(write_req);
unsafe {
let buf = deno_core::uv_compat::UvBuf {
base: (*write_ptr).data.as_ptr() as *mut _,
len: (*write_ptr).data.len(),
};
let ret = deno_core::uv_compat::uv_write(
&mut (*write_ptr).uv_req,
stream,
&buf,
1,
Some(h2_write_cb),
);
if ret != 0 {
let _ = Box::from_raw(write_ptr);
}
}
}
self.clear_outgoing();
}
self.maybe_notify_graceful_close_complete();
self.flush_pending_rst_streams();
}
pub fn receive_data(&mut self, data: &[u8]) {
if data.is_empty() {
return;
}
self.is_sending = true;
unsafe {
ffi::nghttp2_session_mem_recv(
self.session,
data.as_ptr() as _,
data.len(),
);
}
self.is_sending = false;
self.send_pending_data();
if self.pending_destroy {
self.pending_destroy = false;
if let Some(stream) = self.stream.take() {
unsafe {
deno_core::uv_compat::uv_read_stop(stream);
let req =
Box::into_raw(Box::new(deno_core::uv_compat::new_shutdown()));
let ret = deno_core::uv_compat::uv_shutdown(
req,
stream,
Some(h2_shutdown_cb),
);
if ret != 0 {
let _ = Box::from_raw(req);
deno_core::uv_compat::uv_close(
stream as *mut deno_core::uv_compat::UvHandle,
Some(h2_stream_close_cb),
);
}
}
}
}
}
pub fn on_dword_aligned_padding(
&self,
frame_len: usize,
max_payload_len: usize,
) -> usize {
let r = (frame_len + 9) % 8;
if r == 0 {
return frame_len;
}
let pad = frame_len + (8 - r);
std::cmp::min(max_payload_len, pad)
}
pub fn on_max_frame_size_padding(
&self,
_frame_len: usize,
max_payload_len: usize,
) -> usize {
max_payload_len
}
}
#[derive(Serialize)]
#[serde(rename_all = "camelCase")]
pub struct Http2SessionState {
pub effective_local_window_size: f64,
pub effective_recv_data_length: f64,
pub next_stream_id: f64,
pub local_window_size: f64,
pub last_proc_stream_id: f64,
pub remote_window_size: f64,
pub outbound_queue_size: f64,
pub hd_deflate_dynamic_table_size: f64,
pub hd_inflate_dynamic_table_size: f64,
}
pub struct Http2Session {
#[allow(dead_code, reason = "stored for future use")]
type_: SessionType,
session: *mut ffi::nghttp2_session,
#[allow(dead_code, reason = "owns the allocation to prevent premature free")]
callbacks: *mut ffi::nghttp2_session_callbacks,
pub(crate) inner: *mut Session,
}
unsafe impl deno_core::GarbageCollected for Http2Session {
fn trace(&self, _: &mut v8::cppgc::Visitor) {}
fn get_name(&self) -> &'static std::ffi::CStr {
c"Http2Session"
}
}
impl Http2Session {
fn create(
this: v8::Global<v8::Object>,
isolate: &v8::Isolate,
scope: &mut v8::PinScope<'_, '_>,
op_state: Rc<RefCell<OpState>>,
session_type: SessionType,
) -> Self {
let mut session: *mut ffi::nghttp2_session = std::ptr::null_mut();
let options = Http2Options::new(session_type);
let context = scope.get_current_context();
let context = v8::Global::new(scope, context);
let isolate_ptr = unsafe { isolate.as_raw_isolate_ptr() };
let inner = Box::into_raw(Box::new(Session {
session,
streams: HashMap::new(),
op_state,
context,
isolate: isolate_ptr,
this,
outgoing_buffers: Vec::with_capacity(32),
outgoing_length: 0,
padding_strategy: options.padding_strategy(),
graceful_close_initiated: false,
stream: None,
is_sending: false,
pending_destroy: false,
pending_rst_streams: Vec::new(),
}));
unsafe {
let callbacks = create_callbacks();
match session_type {
SessionType::Server => ffi::nghttp2_session_server_new3(
&mut session,
callbacks,
inner as *mut _,
options.ptr(),
std::ptr::null_mut(),
),
SessionType::Client => ffi::nghttp2_session_client_new3(
&mut session,
callbacks,
inner as *mut _,
options.ptr(),
std::ptr::null_mut(),
),
};
(*inner).session = session;
}
Self {
type_: session_type,
session,
callbacks: std::ptr::null_mut(),
inner,
}
}
fn submit_request(
&self,
priority: Http2Priority,
headers: Http2Headers,
options: i32,
) -> i32 {
let has_data = (options & STREAM_OPTION_EMPTY_PAYLOAD) == 0;
let mut data_provider = ffi::nghttp2_data_provider2 {
source: ffi::nghttp2_data_source {
ptr: std::ptr::null_mut(),
},
read_callback: Some(on_stream_read_callback),
};
let dp_ptr = if has_data {
&mut data_provider as *mut _
} else {
std::ptr::null_mut()
};
let ret = unsafe {
ffi::nghttp2_submit_request2(
self.session,
&priority.spec,
headers.data(),
headers.len(),
dp_ptr,
std::ptr::null_mut(),
)
};
const NGHTTP2_ERR_NOMEM: i32 = -901;
assert_ne!(ret, NGHTTP2_ERR_NOMEM);
if ret > 0 {
let session = unsafe { &mut *self.inner };
let (obj, stream) =
Http2Stream::new(session, ret, ffi::NGHTTP2_HCAT_HEADERS);
stream.start_headers(ffi::NGHTTP2_HCAT_HEADERS);
if (options & STREAM_OPTION_GET_TRAILERS) != 0 {
stream.set_has_trailers(true);
}
session.streams.insert(ret, (obj, stream));
session.send_pending_data();
}
ret
}
}
#[op2]
impl Http2Session {
#[constructor]
#[cppgc]
fn new(
#[this] this: v8::Global<v8::Object>,
isolate: &v8::Isolate,
scope: &mut v8::PinScope<'_, '_>,
op_state: Rc<RefCell<OpState>>,
#[smi] type_: i32,
) -> Http2Session {
Http2Session::create(
this,
isolate,
scope,
op_state,
match type_ {
0 => SessionType::Server,
1 => SessionType::Client,
_ => unreachable!(),
},
)
}
#[fast]
fn consume_stream(&self, #[cppgc] tcp: &crate::ops::libuv_stream::TCP) {
let session = unsafe { &mut *self.inner };
let stream = tcp.stream();
unsafe {
deno_core::uv_compat::uv_read_stop(stream);
}
unsafe {
(*stream).data = self.inner as *mut std::ffi::c_void;
}
session.stream = Some(stream);
let ret = unsafe {
deno_core::uv_compat::uv_read_start(
stream,
Some(h2_alloc_cb),
Some(h2_read_cb),
)
};
let _ = ret;
}
#[fast]
fn receive(&self, #[buffer] data: &[u8]) {
let session = unsafe { &mut *self.inner };
session.receive_data(data);
}
#[fast]
#[reentrant]
fn send_pending(&self) {
let session = unsafe { &mut *self.inner };
session.send_pending_data();
}
#[fast]
#[reentrant]
fn destroy(
&self,
#[this] this: v8::Global<v8::Object>,
scope: &mut v8::PinScope<'_, '_>,
) {
let session = unsafe { &mut *self.inner };
if session.is_sending {
session.pending_destroy = true;
} else {
if let Some(stream) = session.stream.take() {
unsafe {
deno_core::uv_compat::uv_read_stop(stream);
let req =
Box::into_raw(Box::new(deno_core::uv_compat::new_shutdown()));
let ret = deno_core::uv_compat::uv_shutdown(
req,
stream,
Some(h2_shutdown_cb),
);
if ret != 0 {
let _ = Box::from_raw(req);
deno_core::uv_compat::uv_close(
stream as *mut deno_core::uv_compat::UvHandle,
Some(h2_stream_close_cb),
);
}
}
}
}
let this_local = v8::Local::new(scope, &this);
let ondone_key = v8::String::new(scope, "ondone").unwrap();
if let Some(Ok(ondone_fn)) = this_local
.get(scope, ondone_key.into())
.map(v8::Local::<v8::Function>::try_from)
{
ondone_fn.call(scope, this_local.into(), &[]);
}
}
#[fast]
fn settings(&self, _cb: v8::Local<v8::Function>) -> bool {
let settings = Http2Settings::init(self.inner);
settings.send();
let session = unsafe { &mut *self.inner };
session.send_pending_data();
true
}
#[reentrant]
fn goaway(
&self,
code: u32,
last_stream_id: i32,
#[anybuffer] maybe_data: Option<&[u8]>,
) {
let (data_ptr, data_len) = maybe_data
.map(|d| (d.as_ptr(), d.len()))
.unwrap_or((std::ptr::null(), 0));
let effective_last_stream_id = if last_stream_id <= 0 {
unsafe { ffi::nghttp2_session_get_last_proc_stream_id(self.session) }
} else {
last_stream_id
};
unsafe {
ffi::nghttp2_submit_goaway(
self.session,
ffi::NGHTTP2_FLAG_NONE as _,
effective_last_stream_id,
code,
data_ptr,
data_len,
);
}
let session = unsafe { &mut *self.inner };
session.send_pending_data();
}
#[fast]
fn set_graceful_close(&self) {
let session = unsafe { &mut *self.inner };
session.graceful_close_initiated = true;
}
#[fast]
fn is_graceful_closing(&self) -> bool {
let session = unsafe { &*self.inner };
session.is_graceful_closing()
}
#[fast]
fn submit_shutdown_notice(&self) {
unsafe { ffi::nghttp2_submit_shutdown_notice(self.session) };
let session = unsafe { &mut *self.inner };
session.start_graceful_close();
session.send_pending_data();
}
#[fast]
#[smi]
fn active_stream_count(&self) -> u32 {
let session = unsafe { &*self.inner };
session.active_stream_count() as u32
}
#[fast]
fn has_pending_data(&self) -> bool {
unsafe {
let want_write = ffi::nghttp2_session_want_write(self.session);
let want_read = ffi::nghttp2_session_want_read(self.session);
want_write != 0 || want_read != 0
}
}
#[fast]
fn local_settings(&self) {
with_settings(|buffer| unsafe {
buffer[SettingsIndex::HeaderTableSize as usize] =
ffi::nghttp2_session_get_local_settings(
self.session,
ffi::NGHTTP2_SETTINGS_HEADER_TABLE_SIZE,
) as u32;
buffer[SettingsIndex::EnablePush as usize] =
ffi::nghttp2_session_get_local_settings(
self.session,
ffi::NGHTTP2_SETTINGS_ENABLE_PUSH,
) as u32;
buffer[SettingsIndex::MaxConcurrentStreams as usize] =
ffi::nghttp2_session_get_local_settings(
self.session,
ffi::NGHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS,
) as u32;
buffer[SettingsIndex::InitialWindowSize as usize] =
ffi::nghttp2_session_get_local_settings(
self.session,
ffi::NGHTTP2_SETTINGS_INITIAL_WINDOW_SIZE,
) as u32;
buffer[SettingsIndex::MaxFrameSize as usize] =
ffi::nghttp2_session_get_local_settings(
self.session,
ffi::NGHTTP2_SETTINGS_MAX_FRAME_SIZE,
) as u32;
buffer[SettingsIndex::MaxHeaderListSize as usize] =
ffi::nghttp2_session_get_local_settings(
self.session,
ffi::NGHTTP2_SETTINGS_MAX_HEADER_LIST_SIZE,
) as u32;
buffer[SettingsIndex::EnableConnectProtocol as usize] =
ffi::nghttp2_session_get_local_settings(
self.session,
ffi::NGHTTP2_SETTINGS_ENABLE_CONNECT_PROTOCOL,
) as u32;
});
}
#[fast]
fn remote_settings(&self) {
with_settings(|buffer| unsafe {
buffer[SettingsIndex::HeaderTableSize as usize] =
ffi::nghttp2_session_get_remote_settings(
self.session,
ffi::NGHTTP2_SETTINGS_HEADER_TABLE_SIZE,
) as u32;
buffer[SettingsIndex::EnablePush as usize] =
ffi::nghttp2_session_get_remote_settings(
self.session,
ffi::NGHTTP2_SETTINGS_ENABLE_PUSH,
) as u32;
buffer[SettingsIndex::MaxConcurrentStreams as usize] =
ffi::nghttp2_session_get_remote_settings(
self.session,
ffi::NGHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS,
) as u32;
buffer[SettingsIndex::InitialWindowSize as usize] =
ffi::nghttp2_session_get_remote_settings(
self.session,
ffi::NGHTTP2_SETTINGS_INITIAL_WINDOW_SIZE,
) as u32;
buffer[SettingsIndex::MaxFrameSize as usize] =
ffi::nghttp2_session_get_remote_settings(
self.session,
ffi::NGHTTP2_SETTINGS_MAX_FRAME_SIZE,
) as u32;
buffer[SettingsIndex::MaxHeaderListSize as usize] =
ffi::nghttp2_session_get_remote_settings(
self.session,
ffi::NGHTTP2_SETTINGS_MAX_HEADER_LIST_SIZE,
) as u32;
buffer[SettingsIndex::EnableConnectProtocol as usize] =
ffi::nghttp2_session_get_remote_settings(
self.session,
ffi::NGHTTP2_SETTINGS_ENABLE_CONNECT_PROTOCOL,
) as u32;
});
}
#[serde]
fn get_state(&self) -> Http2SessionState {
unsafe {
Http2SessionState {
effective_local_window_size:
ffi::nghttp2_session_get_effective_local_window_size(self.session)
as f64,
effective_recv_data_length:
ffi::nghttp2_session_get_effective_recv_data_length(self.session)
as f64,
next_stream_id: ffi::nghttp2_session_get_next_stream_id(self.session)
as f64,
local_window_size: ffi::nghttp2_session_get_local_window_size(
self.session,
) as f64,
last_proc_stream_id: ffi::nghttp2_session_get_last_proc_stream_id(
self.session,
) as f64,
remote_window_size: ffi::nghttp2_session_get_remote_window_size(
self.session,
) as f64,
outbound_queue_size: ffi::nghttp2_session_get_outbound_queue_size(
self.session,
) as f64,
hd_deflate_dynamic_table_size:
ffi::nghttp2_session_get_hd_deflate_dynamic_table_size(self.session)
as f64,
hd_inflate_dynamic_table_size:
ffi::nghttp2_session_get_hd_inflate_dynamic_table_size(self.session)
as f64,
}
}
}
#[fast]
fn set_next_stream_id(&self, id: i32) -> bool {
let ret =
unsafe { ffi::nghttp2_session_set_next_stream_id(self.session, id) };
if ret < 0 {
log::debug!("failed to set next stream id to {}", id);
return false;
}
log::debug!("set next stream id to {}", id);
true
}
#[fast]
fn set_local_window_size(&self, window_size: i32) -> i32 {
unsafe {
ffi::nghttp2_session_set_local_window_size(
self.session,
ffi::NGHTTP2_FLAG_NONE as u8,
0,
window_size,
)
}
}
#[fast]
fn update_chunks_sent(&self) -> u32 {
let session = unsafe { &*self.inner };
session.outgoing_buffers.len() as u32
}
#[fast]
fn origin(&self, #[string] origins: &str, count: i32) -> i32 {
let mut ov: Vec<ffi::nghttp2_origin_entry> =
Vec::with_capacity(count as usize);
let origins_bytes = origins.as_bytes();
let mut offset = 0;
for _ in 0..count {
if offset + 2 > origins_bytes.len() {
break;
}
let len = ((origins_bytes[offset] as usize) << 8)
| (origins_bytes[offset + 1] as usize);
offset += 2;
if offset + len > origins_bytes.len() {
break;
}
ov.push(ffi::nghttp2_origin_entry {
origin: origins_bytes[offset..].as_ptr() as *mut u8,
origin_len: len,
});
offset += len;
}
let ret = unsafe {
ffi::nghttp2_submit_origin(
self.session,
ffi::NGHTTP2_FLAG_NONE as u8,
ov.as_ptr(),
ov.len(),
)
};
let session = unsafe { &mut *self.inner };
session.send_pending_data();
ret
}
#[fast]
fn altsvc(
&self,
stream_id: i32,
#[string] origin: &str,
#[string] value: &str,
) -> i32 {
let origin_bytes = origin.as_bytes();
let value_bytes = value.as_bytes();
if origin_bytes.len() + value_bytes.len() > 16382 {
return -1;
}
if (origin_bytes.is_empty() && stream_id == 0)
|| (!origin_bytes.is_empty() && stream_id != 0)
{
return -1;
}
let ret = unsafe {
ffi::nghttp2_submit_altsvc(
self.session,
ffi::NGHTTP2_FLAG_NONE as u8,
stream_id,
origin_bytes.as_ptr(),
origin_bytes.len(),
value_bytes.as_ptr(),
value_bytes.len(),
)
};
let session = unsafe { &mut *self.inner };
session.send_pending_data();
ret
}
#[fast]
fn ping(&self, #[buffer] payload: &[u8]) -> i32 {
if payload.len() != 8 {
return -1;
}
let ret = unsafe {
ffi::nghttp2_submit_ping(
self.session,
ffi::NGHTTP2_FLAG_NONE as u8,
payload.as_ptr(),
)
};
let session = unsafe { &mut *self.inner };
session.send_pending_data();
ret
}
fn request<'s>(
&self,
scope: &mut v8::PinScope<'s, '_>,
#[serde] headers: (String, usize),
options: i32,
stream_id: i32,
weight: i32,
exclusive: bool,
) -> v8::Local<'s, v8::Value> {
let priority = Http2Priority::new(stream_id, weight, exclusive);
let headers = Http2Headers::from(headers);
let ret = self.submit_request(priority, headers, options);
if ret <= 0 {
return v8::Integer::new(scope, ret).into();
}
let session = unsafe { &*self.inner };
if let Some(stream_obj) = session.find_stream_obj(ret) {
return v8::Local::new(scope, stream_obj).into();
}
v8::Integer::new(scope, -1).into()
}
}
#[op2]
pub fn op_http2_callbacks(
state: &mut OpState,
#[scoped] session_internal_error_cb: v8::Global<v8::Function>,
#[scoped] priority_frame_cb: v8::Global<v8::Function>,
#[scoped] settings_frame_cb: v8::Global<v8::Function>,
#[scoped] ping_frame_cb: v8::Global<v8::Function>,
#[scoped] headers_frame_cb: v8::Global<v8::Function>,
#[scoped] frame_error_cb: v8::Global<v8::Function>,
#[scoped] goaway_data_cb: v8::Global<v8::Function>,
#[scoped] alt_svc_cb: v8::Global<v8::Function>,
#[scoped] origin_frame_cb: v8::Global<v8::Function>,
#[scoped] stream_trailers_cb: v8::Global<v8::Function>,
#[scoped] stream_close_cb: v8::Global<v8::Function>,
) {
state.put(SessionCallbacks {
session_internal_error_cb,
priority_frame_cb,
settings_frame_cb,
ping_frame_cb,
headers_frame_cb,
frame_error_cb,
goaway_data_cb,
alt_svc_cb,
origin_frame_cb,
stream_trailers_cb,
stream_close_cb,
});
}
#[op2]
#[serde]
pub fn op_http2_http_state<'a>(
scope: &mut v8::PinScope<'a, 'a>,
) -> JSHttp2State<'a> {
JSHttp2State::create(scope)
}