1use tokio::runtime::Runtime;
2use url::Url;
3
4use bytes::Bytes;
5use hang::catalog::{Video, VideoConfig, H264};
6use hang::model::{Frame, Timestamp, TrackProducer};
7use hang::{Catalog, CatalogProducer};
8use moq_lite::{BroadcastProducer, Track};
9use std::ffi::CStr;
10use std::os::raw::c_char;
11use std::sync::atomic::{AtomicBool, Ordering};
12use std::sync::{Mutex, OnceLock};
13use std::thread::JoinHandle;
14use std::{collections::HashMap, time::Duration};
15
16static IMPORT: OnceLock<Mutex<ImportJoy>> = OnceLock::new();
17static HANDLE: OnceLock<Mutex<JoinHandle<()>>> = OnceLock::new();
18static RUNNING: AtomicBool = AtomicBool::new(false);
19
20#[no_mangle]
26pub unsafe extern "C" fn hang_start_from_c(
27 c_server_url: *const c_char,
28 c_path: *const c_char,
29 _c_profile: *const c_char,
30) {
31 if c_server_url.is_null() || c_path.is_null() {
33 return;
34 }
35
36 let cstr_server_url = CStr::from_ptr(c_server_url);
37 let server_url = match cstr_server_url.to_str() {
38 Ok(s) => s,
39 Err(_) => return,
40 };
41
42 let url = match Url::parse(server_url) {
43 Ok(u) => u,
44 Err(_) => return,
45 };
46
47 let cstr_path = CStr::from_ptr(c_path);
48 let path = match cstr_path.to_str() {
49 Ok(s) => s.to_string(),
50 Err(_) => return,
51 };
52
53 RUNNING.store(true, Ordering::Relaxed);
54
55 let handle = std::thread::spawn(move || {
56 let rt = match Runtime::new() {
57 Ok(rt) => rt,
58 Err(_) => return,
59 };
60
61 rt.block_on(async {
62 let _ = client(url, path).await;
63 });
64 });
65
66 let _ = HANDLE.set(Mutex::new(handle));
67}
68
69#[no_mangle]
70pub extern "C" fn hang_stop_from_c() {
71 RUNNING.store(false, Ordering::Relaxed);
72}
73
74#[no_mangle]
80pub unsafe extern "C" fn hang_write_video_packet_from_c(data: *const u8, size: usize, keyframe: i32, dts: u64) {
81 if data.is_null() || size == 0 {
83 return;
84 }
85
86 if let Some(import_mutex) = IMPORT.get() {
87 if let Ok(mut import) = import_mutex.lock() {
88 import.write_video_frame(data, size, keyframe > 0, dts);
90 }
91 }
92}
93
94pub async fn client(url: Url, name: String) -> anyhow::Result<()> {
95 let broadcast = moq_lite::Broadcast::produce();
96 let config = moq_native::ClientConfig::default();
97 let client = config.init()?;
98
99 let connection = client.connect(url).await?;
100
101 let origin = moq_lite::Origin::produce();
102
103 let session = moq_lite::Session::connect(connection, origin.consumer, None).await?;
104
105 let mut import = ImportJoy::new(broadcast.producer);
106 import.init();
107 let _ = IMPORT.set(Mutex::new(import));
108
109 origin.producer.publish_broadcast(&name, broadcast.consumer);
110
111 while RUNNING.load(Ordering::Relaxed) {
112 tokio::time::sleep(Duration::from_millis(30)).await;
113 }
114
115 session.close(moq_lite::Error::Cancel);
116
117 Ok(())
118}
119
120pub struct ImportJoy {
121 broadcast: BroadcastProducer,
123
124 catalog: CatalogProducer,
126
127 tracks: HashMap<u32, TrackProducer>,
129
130 sent_one_keyframe: bool,
131}
132
133impl ImportJoy {
134 pub fn new(mut broadcast: BroadcastProducer) -> Self {
136 let catalog = Catalog::default().produce();
137 broadcast.insert_track(catalog.consumer.track);
138
139 Self {
140 broadcast,
141 catalog: catalog.producer,
142 tracks: HashMap::default(),
143 sent_one_keyframe: false,
144 }
145 }
146
147 pub fn init(&mut self) {
148 let mut video_renditions = HashMap::new();
150
151 let (track_name, config) = Self::init_video();
152 let track = Track {
153 name: track_name.clone(),
154 priority: 2,
155 };
156 let track_produce = track.produce();
157 self.broadcast.insert_track(track_produce.consumer);
158 video_renditions.insert(track_name, config);
159
160 self.tracks.insert(0, track_produce.producer.into());
161
162 if !video_renditions.is_empty() {
163 let video = Video {
164 renditions: video_renditions,
165 priority: 2,
166 display: None,
167 rotation: None,
168 flip: None,
169 };
170 self.catalog.set_video(Some(video));
171 }
172
173 self.catalog.publish();
174 }
175
176 pub fn init_video() -> (String, VideoConfig) {
177 let name = String::from("video1");
178
179 let config = VideoConfig {
180 coded_width: Some(1280),
181 coded_height: Some(720),
182 codec: H264 {
183 profile: 0x4d,
184 constraints: 0x00,
185 level: 0x29,
186 }
187 .into(),
188 description: None,
189 framerate: None,
191 bitrate: None,
192 display_ratio_width: None,
193 display_ratio_height: None,
194 optimize_for_latency: None,
195 };
196
197 (name, config)
198 }
199
200 pub unsafe fn write_video_frame(&mut self, data: *const u8, size: usize, keyframe: bool, dts: u64) {
204 if !self.sent_one_keyframe {
205 if !keyframe {
206 return;
207 }
208 self.sent_one_keyframe = true;
209 }
210
211 let Some(track) = self.tracks.get_mut(&0) else {
212 return;
213 };
214
215 let payload = Bytes::copy_from_slice(std::slice::from_raw_parts(data, size));
217
218 let timestamp = Timestamp::from_micros(dts);
219
220 let frame = Frame {
221 timestamp,
222 keyframe,
223 payload,
224 };
225
226 track.write(frame);
227 }
228}