hang/
lib.rs

1use tokio::runtime::Runtime;
2use url::Url;
3
4use bytes::Bytes;
5use hang::catalog::{Video, VideoConfig, H264};
6use hang::model::{Frame, Timestamp, TrackProducer};
7use hang::{Catalog, CatalogProducer};
8use moq_lite::{BroadcastProducer, Track};
9use std::ffi::CStr;
10use std::os::raw::c_char;
11use std::sync::atomic::{AtomicBool, Ordering};
12use std::sync::{Mutex, OnceLock};
13use std::thread::JoinHandle;
14use std::{collections::HashMap, time::Duration};
15
16static IMPORT: OnceLock<Mutex<ImportJoy>> = OnceLock::new();
17static HANDLE: OnceLock<Mutex<JoinHandle<()>>> = OnceLock::new();
18static RUNNING: AtomicBool = AtomicBool::new(false);
19
20/// # Safety
21///
22/// The caller must ensure that:
23/// - `c_server_url` and `c_path` are valid null-terminated C strings
24/// - The pointers remain valid for the duration of this function call
25#[no_mangle]
26pub unsafe extern "C" fn hang_start_from_c(
27	c_server_url: *const c_char,
28	c_path: *const c_char,
29	_c_profile: *const c_char,
30) {
31	// Validate C string pointers
32	if c_server_url.is_null() || c_path.is_null() {
33		return;
34	}
35
36	let cstr_server_url = CStr::from_ptr(c_server_url);
37	let server_url = match cstr_server_url.to_str() {
38		Ok(s) => s,
39		Err(_) => return,
40	};
41
42	let url = match Url::parse(server_url) {
43		Ok(u) => u,
44		Err(_) => return,
45	};
46
47	let cstr_path = CStr::from_ptr(c_path);
48	let path = match cstr_path.to_str() {
49		Ok(s) => s.to_string(),
50		Err(_) => return,
51	};
52
53	RUNNING.store(true, Ordering::Relaxed);
54
55	let handle = std::thread::spawn(move || {
56		let rt = match Runtime::new() {
57			Ok(rt) => rt,
58			Err(_) => return,
59		};
60
61		rt.block_on(async {
62			let _ = client(url, path).await;
63		});
64	});
65
66	let _ = HANDLE.set(Mutex::new(handle));
67}
68
69#[no_mangle]
70pub extern "C" fn hang_stop_from_c() {
71	RUNNING.store(false, Ordering::Relaxed);
72}
73
74/// # Safety
75///
76/// The caller must ensure that:
77/// - `data` points to a valid buffer of at least `size` bytes
78/// - The buffer remains valid for the duration of this function call
79#[no_mangle]
80pub unsafe extern "C" fn hang_write_video_packet_from_c(data: *const u8, size: usize, keyframe: i32, dts: u64) {
81	// Validate pointer and size
82	if data.is_null() || size == 0 {
83		return;
84	}
85
86	if let Some(import_mutex) = IMPORT.get() {
87		if let Ok(mut import) = import_mutex.lock() {
88			// SAFETY: Caller of hang_write_video_packet_from_c guarantees data is valid
89			import.write_video_frame(data, size, keyframe > 0, dts);
90		}
91	}
92}
93
94pub async fn client(url: Url, name: String) -> anyhow::Result<()> {
95	let broadcast = moq_lite::Broadcast::produce();
96	let config = moq_native::ClientConfig::default();
97	let client = config.init()?;
98
99	let connection = client.connect(url).await?;
100
101	let origin = moq_lite::Origin::produce();
102
103	let session = moq_lite::Session::connect(connection, origin.consumer, None).await?;
104
105	let mut import = ImportJoy::new(broadcast.producer);
106	import.init();
107	let _ = IMPORT.set(Mutex::new(import));
108
109	origin.producer.publish_broadcast(&name, broadcast.consumer);
110
111	while RUNNING.load(Ordering::Relaxed) {
112		tokio::time::sleep(Duration::from_millis(30)).await;
113	}
114
115	session.close(moq_lite::Error::Cancel);
116
117	Ok(())
118}
119
120pub struct ImportJoy {
121	// The broadcast being produced
122	broadcast: BroadcastProducer,
123
124	// The catalog being produced
125	catalog: CatalogProducer,
126
127	// A lookup to tracks in the broadcast
128	tracks: HashMap<u32, TrackProducer>,
129
130	sent_one_keyframe: bool,
131}
132
133impl ImportJoy {
134	/// Create a new importer that will write to the given broadcast.
135	pub fn new(mut broadcast: BroadcastProducer) -> Self {
136		let catalog = Catalog::default().produce();
137		broadcast.insert_track(catalog.consumer.track);
138
139		Self {
140			broadcast,
141			catalog: catalog.producer,
142			tracks: HashMap::default(),
143			sent_one_keyframe: false,
144		}
145	}
146
147	pub fn init(&mut self) {
148		// Produce the catalog
149		let mut video_renditions = HashMap::new();
150
151		let (track_name, config) = Self::init_video();
152		let track = Track {
153			name: track_name.clone(),
154			priority: 2,
155		};
156		let track_produce = track.produce();
157		self.broadcast.insert_track(track_produce.consumer);
158		video_renditions.insert(track_name, config);
159
160		self.tracks.insert(0, track_produce.producer.into());
161
162		if !video_renditions.is_empty() {
163			let video = Video {
164				renditions: video_renditions,
165				priority: 2,
166				display: None,
167				rotation: None,
168				flip: None,
169			};
170			self.catalog.set_video(Some(video));
171		}
172
173		self.catalog.publish();
174	}
175
176	pub fn init_video() -> (String, VideoConfig) {
177		let name = String::from("video1");
178
179		let config = VideoConfig {
180			coded_width: Some(1280),
181			coded_height: Some(720),
182			codec: H264 {
183				profile: 0x4d,
184				constraints: 0x00,
185				level: 0x29,
186			}
187			.into(),
188			description: None,
189			// TODO: populate these fields
190			framerate: None,
191			bitrate: None,
192			display_ratio_width: None,
193			display_ratio_height: None,
194			optimize_for_latency: None,
195		};
196
197		(name, config)
198	}
199
200	/// # Safety
201	///
202	/// The caller must ensure that `data` points to a valid buffer of at least `size` bytes
203	pub unsafe fn write_video_frame(&mut self, data: *const u8, size: usize, keyframe: bool, dts: u64) {
204		if !self.sent_one_keyframe {
205			if !keyframe {
206				return;
207			}
208			self.sent_one_keyframe = true;
209		}
210
211		let Some(track) = self.tracks.get_mut(&0) else {
212			return;
213		};
214
215		// Use copy_from_slice to own the data, avoiding use-after-free when C caller frees the buffer
216		let payload = Bytes::copy_from_slice(std::slice::from_raw_parts(data, size));
217
218		let timestamp = Timestamp::from_micros(dts);
219
220		let frame = Frame {
221			timestamp,
222			keyframe,
223			payload,
224		};
225
226		track.write(frame);
227	}
228}