Skip to main content

openruntimes/
lib.rs

1use serde::{Deserialize, Serialize};
2use serde_json::json;
3use std::collections::HashMap;
4use std::fs::{create_dir_all, OpenOptions};
5use std::io::Write as IoWrite;
6use std::sync::{Arc, Mutex};
7use std::time::{SystemTime, UNIX_EPOCH};
8
9const MAX_LOG_SIZE: usize = 8000;
10
11pub trait LogMessage {
12    fn to_log_string(&self) -> String;
13}
14
15impl LogMessage for &str {
16    fn to_log_string(&self) -> String {
17        self.to_string()
18    }
19}
20
21impl LogMessage for String {
22    fn to_log_string(&self) -> String {
23        self.clone()
24    }
25}
26
27impl LogMessage for i32 {
28    fn to_log_string(&self) -> String {
29        self.to_string()
30    }
31}
32
33impl LogMessage for i64 {
34    fn to_log_string(&self) -> String {
35        self.to_string()
36    }
37}
38
39impl LogMessage for u32 {
40    fn to_log_string(&self) -> String {
41        self.to_string()
42    }
43}
44
45impl LogMessage for u64 {
46    fn to_log_string(&self) -> String {
47        self.to_string()
48    }
49}
50
51impl LogMessage for f32 {
52    fn to_log_string(&self) -> String {
53        self.to_string()
54    }
55}
56
57impl LogMessage for f64 {
58    fn to_log_string(&self) -> String {
59        self.to_string()
60    }
61}
62
63impl LogMessage for bool {
64    fn to_log_string(&self) -> String {
65        self.to_string()
66    }
67}
68
69impl LogMessage for serde_json::Value {
70    fn to_log_string(&self) -> String {
71        serde_json::to_string(self).unwrap_or_else(|_| format!("{:?}", self))
72    }
73}
74
75impl LogMessage for Vec<&str> {
76    fn to_log_string(&self) -> String {
77        serde_json::to_string(self).unwrap_or_else(|_| format!("{:?}", self))
78    }
79}
80
81impl LogMessage for Vec<String> {
82    fn to_log_string(&self) -> String {
83        serde_json::to_string(self).unwrap_or_else(|_| format!("{:?}", self))
84    }
85}
86
87#[derive(Clone)]
88pub struct Context {
89    pub req: ContextRequest,
90    pub res: ContextResponse,
91    logger: Logger,
92}
93
94impl Context {
95    pub fn new(logger: Logger) -> Self {
96        Context {
97            req: ContextRequest::new(),
98            res: ContextResponse::new(),
99            logger,
100        }
101    }
102
103    pub fn log<T: LogMessage>(&self, message: T) {
104        let msg = message.to_log_string();
105        self.logger.write(vec![msg], LoggerType::Log, false);
106    }
107
108    pub fn log_multiple(&self, messages: Vec<String>) {
109        self.logger.write(messages, LoggerType::Log, false);
110    }
111
112    pub fn error<T: LogMessage>(&self, message: T) {
113        let msg = message.to_log_string();
114        self.logger.write(vec![msg], LoggerType::Error, false);
115    }
116
117    pub fn error_multiple(&self, messages: Vec<String>) {
118        self.logger.write(messages, LoggerType::Error, false);
119    }
120
121    pub fn get_logger(&self) -> &Logger {
122        &self.logger
123    }
124
125    pub fn get_logger_mut(&mut self) -> &mut Logger {
126        &mut self.logger
127    }
128}
129
130#[derive(Clone, Debug)]
131pub struct ContextRequest {
132    pub headers: HashMap<String, String>,
133    pub method: String,
134    pub url: String,
135    pub scheme: String,
136    pub host: String,
137    pub port: u16,
138    pub path: String,
139    pub query_string: String,
140    pub query: HashMap<String, String>,
141    body_binary: Vec<u8>,
142    body_parsed: Option<serde_json::Value>,
143}
144
145impl ContextRequest {
146    pub fn new() -> Self {
147        ContextRequest {
148            headers: HashMap::new(),
149            method: String::new(),
150            url: String::new(),
151            scheme: String::new(),
152            host: String::new(),
153            port: 80,
154            path: String::new(),
155            query_string: String::new(),
156            query: HashMap::new(),
157            body_binary: Vec::new(),
158            body_parsed: None,
159        }
160    }
161
162    pub fn set_body_binary(&mut self, data: Vec<u8>) {
163        self.body_binary = data;
164        self.body_parsed = None;
165    }
166
167    pub fn body_binary(&self) -> Vec<u8> {
168        self.body_binary.clone()
169    }
170
171    pub fn body_text(&self) -> String {
172        String::from_utf8_lossy(&self.body_binary).to_string()
173    }
174
175    pub fn body_json<T>(&mut self) -> Result<T, serde_json::Error>
176    where
177        T: for<'de> Deserialize<'de>,
178    {
179        if self.body_parsed.is_none() {
180            let value: serde_json::Value = serde_json::from_slice(&self.body_binary)?;
181            self.body_parsed = Some(value);
182        }
183
184        if let Some(ref parsed) = self.body_parsed {
185            serde_json::from_value(parsed.clone())
186        } else {
187            serde_json::from_slice(&self.body_binary)
188        }
189    }
190
191    pub fn body(&mut self) -> serde_json::Value {
192        let content_type = self
193            .headers
194            .get("content-type")
195            .map(|s| s.to_lowercase())
196            .unwrap_or_default();
197
198        if content_type.contains("application/json") {
199            if self.body_binary.is_empty() {
200                return serde_json::Value::Object(serde_json::Map::new());
201            }
202
203            if self.body_parsed.is_none() {
204                if let Ok(value) = serde_json::from_slice(&self.body_binary) {
205                    self.body_parsed = Some(value);
206                }
207            }
208            self.body_parsed
209                .clone()
210                .unwrap_or(serde_json::Value::Object(serde_json::Map::new()))
211        } else {
212            serde_json::Value::String(self.body_text())
213        }
214    }
215
216    #[deprecated(note = "Use body_binary() instead")]
217    pub fn body_raw(&self) -> Vec<u8> {
218        self.body_binary()
219    }
220}
221
222impl Default for ContextRequest {
223    fn default() -> Self {
224        Self::new()
225    }
226}
227
228#[derive(Clone, Debug)]
229pub struct Response {
230    pub status_code: u16,
231    pub body: Vec<u8>,
232    pub headers: HashMap<String, String>,
233}
234
235impl Response {
236    pub fn new() -> Self {
237        Response {
238            status_code: 200,
239            body: Vec::new(),
240            headers: HashMap::new(),
241        }
242    }
243}
244
245impl Default for Response {
246    fn default() -> Self {
247        Self::new()
248    }
249}
250
251#[derive(Clone)]
252pub struct ContextResponse;
253
254impl ContextResponse {
255    pub fn new() -> Self {
256        ContextResponse
257    }
258
259    pub fn text<S: Into<String>>(
260        &self,
261        text: S,
262        status_code: Option<u16>,
263        headers: Option<HashMap<String, String>>,
264    ) -> Response {
265        let text_string = text.into();
266        let mut response = Response {
267            status_code: status_code.unwrap_or(200),
268            body: text_string.into_bytes(),
269            headers: headers.unwrap_or_default(),
270        };
271
272        if !response.headers.contains_key("content-type") {
273            response
274                .headers
275                .insert("content-type".to_string(), "text/plain".to_string());
276        }
277
278        response
279    }
280
281    pub fn json<T: Serialize>(
282        &self,
283        data: T,
284        status_code: Option<u16>,
285        headers: Option<HashMap<String, String>>,
286    ) -> Response {
287        let json_string = serde_json::to_string(&data).unwrap_or_else(|_| "{}".to_string());
288        let mut response = Response {
289            status_code: status_code.unwrap_or(200),
290            body: json_string.into_bytes(),
291            headers: headers.unwrap_or_default(),
292        };
293
294        if !response.headers.contains_key("content-type") {
295            response
296                .headers
297                .insert("content-type".to_string(), "application/json".to_string());
298        }
299
300        response
301    }
302
303    pub fn binary(
304        &self,
305        data: Vec<u8>,
306        status_code: Option<u16>,
307        headers: Option<HashMap<String, String>>,
308    ) -> Response {
309        let mut response = Response {
310            status_code: status_code.unwrap_or(200),
311            body: data,
312            headers: headers.unwrap_or_default(),
313        };
314
315        if !response.headers.contains_key("content-type") {
316            response.headers.insert(
317                "content-type".to_string(),
318                "application/octet-stream".to_string(),
319            );
320        }
321
322        response
323    }
324
325    pub fn empty(&self) -> Response {
326        Response {
327            status_code: 204,
328            body: Vec::new(),
329            headers: HashMap::new(),
330        }
331    }
332
333    pub fn redirect<S: Into<String>>(
334        &self,
335        url: S,
336        status_code: Option<u16>,
337        headers: Option<HashMap<String, String>>,
338    ) -> Response {
339        let url_string = url.into();
340        let mut response_headers = headers.unwrap_or_default();
341        response_headers.insert("location".to_string(), url_string);
342
343        Response {
344            status_code: status_code.unwrap_or(301),
345            body: Vec::new(),
346            headers: response_headers,
347        }
348    }
349
350    #[deprecated(note = "Use text(), json(), or binary() instead")]
351    pub fn send<S: Into<String>>(&self, data: S) -> Response {
352        self.text(data, None, None)
353    }
354}
355
356impl Default for ContextResponse {
357    fn default() -> Self {
358        Self::new()
359    }
360}
361
362#[derive(Clone)]
363pub enum LoggerType {
364    Log,
365    Error,
366}
367
368struct NativeCapture {
369    original_stdout: libc::c_int,
370    original_stderr: libc::c_int,
371    stdout_read: libc::c_int,
372    stderr_read: libc::c_int,
373}
374
375// Native log capture replaces process-global stdout/stderr file descriptors.
376// Tracking the active capture in a single global slot is required for safety
377// under concurrent execution: if two requests both took their own per-Logger
378// redirect, the second's dup2 would clobber the first, and the first's revert
379// would leave the second writing to a torn-down pipe. With a global slot,
380// the first request to override owns the redirect; concurrent requests
381// gracefully skip native capture (their structured context.log calls still
382// work). Required by runtimes whose timeout path can leave a user function
383// running on a detached blocking thread.
384static NATIVE_CAPTURE: Mutex<Option<NativeCapture>> = Mutex::new(None);
385
386#[derive(Clone)]
387pub struct Logger {
388    pub id: String,
389    enabled: bool,
390    include_native: bool,
391    logs: Arc<Mutex<Vec<serde_json::Value>>>,
392    owns_native_capture: Arc<Mutex<bool>>,
393    native_info_logged: Arc<Mutex<bool>>,
394}
395
396impl Logger {
397    pub fn new(logging: &str, log_id: Option<String>) -> Result<Self, String> {
398        let enabled = logging == "" || logging == "enabled";
399        let include_native = enabled;
400
401        let id = if let Some(provided_id) = log_id {
402            provided_id
403        } else if std::env::var("OPEN_RUNTIMES_ENV").unwrap_or_default() == "development" {
404            "dev".to_string()
405        } else {
406            Self::generate_id()
407        };
408
409        Ok(Logger {
410            id,
411            enabled,
412            include_native,
413            logs: Arc::new(Mutex::new(Vec::new())),
414            owns_native_capture: Arc::new(Mutex::new(false)),
415            native_info_logged: Arc::new(Mutex::new(false)),
416        })
417    }
418
419    fn generate_id() -> String {
420        let now = SystemTime::now().duration_since(UNIX_EPOCH).unwrap();
421        let sec = now.as_secs();
422        let msec = now.subsec_millis();
423
424        let sec_hex = format!("{:x}", sec);
425        let msec_hex = format!("{:05x}", msec);
426
427        let mut random_padding = String::new();
428        for _ in 0..7 {
429            let rand_digit = rand::random::<u8>() % 16;
430            random_padding.push_str(&format!("{:x}", rand_digit));
431        }
432
433        format!("{}{}{}", sec_hex, msec_hex, random_padding)
434    }
435
436    pub fn write(&self, messages: Vec<String>, log_type: LoggerType, native: bool) {
437        if !native && !self.enabled {
438            return;
439        }
440
441        if native && !self.include_native {
442            return;
443        }
444
445        if native {
446            let mut info_logged = match self.native_info_logged.lock() {
447                Ok(l) => l,
448                Err(_) => return,
449            };
450            if !*info_logged {
451                *info_logged = true;
452                drop(info_logged);
453                self.write(
454                    vec!["Native logs detected. Use context.log() or context.error() for better experience.".to_string()],
455                    LoggerType::Log,
456                    false,
457                );
458            }
459        }
460
461        let type_str = match log_type {
462            LoggerType::Log => "log",
463            LoggerType::Error => "error",
464        };
465
466        let stream = match log_type {
467            LoggerType::Log => "stdout",
468            LoggerType::Error => "stderr",
469        };
470
471        let timestamp = SystemTime::now()
472            .duration_since(UNIX_EPOCH)
473            .unwrap()
474            .as_millis();
475
476        let mut message = messages.join(" ");
477
478        if message.len() > MAX_LOG_SIZE {
479            let mut safe_len = MAX_LOG_SIZE;
480            while safe_len > 0 && !message.is_char_boundary(safe_len) {
481                safe_len -= 1;
482            }
483            message.truncate(safe_len);
484            message.push_str("... Log truncated due to size limit (8000 characters)");
485        }
486
487        let log_entry = json!({
488            "timestamp": timestamp,
489            "type": type_str,
490            "message": message,
491            "stream": stream,
492        });
493
494        if let Ok(mut logs) = self.logs.lock() {
495            logs.push(log_entry);
496        }
497    }
498
499    pub fn override_native_logs(&mut self) {
500        if !self.enabled || !self.include_native {
501            return;
502        }
503
504        let mut owns = match self.owns_native_capture.lock() {
505            Ok(o) => o,
506            Err(p) => p.into_inner(),
507        };
508        if *owns {
509            return;
510        }
511
512        let mut global = match NATIVE_CAPTURE.lock() {
513            Ok(g) => g,
514            Err(p) => p.into_inner(),
515        };
516        if global.is_some() {
517            // Another logger already redirected the process-global stdout/stderr.
518            // Skip native capture for this request — concurrent redirects would
519            // race on the dup2 + revert sequence.
520            return;
521        }
522
523        unsafe {
524            let mut stdout_pipe: [libc::c_int; 2] = [-1, -1];
525            let mut stderr_pipe: [libc::c_int; 2] = [-1, -1];
526
527            if libc::pipe(stdout_pipe.as_mut_ptr()) != 0 {
528                return;
529            }
530            if libc::pipe(stderr_pipe.as_mut_ptr()) != 0 {
531                libc::close(stdout_pipe[0]);
532                libc::close(stdout_pipe[1]);
533                return;
534            }
535
536            let original_stdout = libc::dup(libc::STDOUT_FILENO);
537            let original_stderr = libc::dup(libc::STDERR_FILENO);
538
539            if original_stdout < 0 || original_stderr < 0 {
540                libc::close(stdout_pipe[0]);
541                libc::close(stdout_pipe[1]);
542                libc::close(stderr_pipe[0]);
543                libc::close(stderr_pipe[1]);
544                if original_stdout >= 0 {
545                    libc::close(original_stdout);
546                }
547                if original_stderr >= 0 {
548                    libc::close(original_stderr);
549                }
550                return;
551            }
552
553            if libc::dup2(stdout_pipe[1], libc::STDOUT_FILENO) < 0 {
554                libc::dup2(original_stdout, libc::STDOUT_FILENO);
555                libc::close(original_stdout);
556                libc::close(original_stderr);
557                libc::close(stdout_pipe[0]);
558                libc::close(stdout_pipe[1]);
559                libc::close(stderr_pipe[0]);
560                libc::close(stderr_pipe[1]);
561                return;
562            }
563
564            if libc::dup2(stderr_pipe[1], libc::STDERR_FILENO) < 0 {
565                libc::dup2(original_stdout, libc::STDOUT_FILENO);
566                libc::dup2(original_stderr, libc::STDERR_FILENO);
567                libc::close(original_stdout);
568                libc::close(original_stderr);
569                libc::close(stdout_pipe[0]);
570                libc::close(stdout_pipe[1]);
571                libc::close(stderr_pipe[0]);
572                libc::close(stderr_pipe[1]);
573                return;
574            }
575
576            libc::close(stdout_pipe[1]);
577            libc::close(stderr_pipe[1]);
578
579            *global = Some(NativeCapture {
580                original_stdout,
581                original_stderr,
582                stdout_read: stdout_pipe[0],
583                stderr_read: stderr_pipe[0],
584            });
585            *owns = true;
586        }
587    }
588
589    pub fn revert_native_logs(&mut self) {
590        let mut owns = match self.owns_native_capture.lock() {
591            Ok(o) => o,
592            Err(p) => p.into_inner(),
593        };
594        if !*owns {
595            return;
596        }
597
598        let capture = {
599            let mut global = match NATIVE_CAPTURE.lock() {
600                Ok(g) => g,
601                Err(p) => p.into_inner(),
602            };
603            global.take()
604        };
605
606        *owns = false;
607
608        let Some(capture) = capture else { return };
609
610        unsafe {
611            let _ = std::io::stdout().flush();
612            let _ = std::io::stderr().flush();
613
614            libc::dup2(capture.original_stdout, libc::STDOUT_FILENO);
615            libc::dup2(capture.original_stderr, libc::STDERR_FILENO);
616
617            libc::close(capture.original_stdout);
618            libc::close(capture.original_stderr);
619        }
620
621        let stdout_data = unsafe {
622            let mut data = String::new();
623            let mut buf = [0u8; 4096];
624            loop {
625                let n = libc::read(
626                    capture.stdout_read,
627                    buf.as_mut_ptr() as *mut libc::c_void,
628                    buf.len(),
629                );
630                if n <= 0 {
631                    break;
632                }
633                data.push_str(&String::from_utf8_lossy(&buf[..n as usize]));
634            }
635            libc::close(capture.stdout_read);
636            data
637        };
638
639        let stderr_data = unsafe {
640            let mut data = String::new();
641            let mut buf = [0u8; 4096];
642            loop {
643                let n = libc::read(
644                    capture.stderr_read,
645                    buf.as_mut_ptr() as *mut libc::c_void,
646                    buf.len(),
647                );
648                if n <= 0 {
649                    break;
650                }
651                data.push_str(&String::from_utf8_lossy(&buf[..n as usize]));
652            }
653            libc::close(capture.stderr_read);
654            data
655        };
656
657        if !stdout_data.is_empty() {
658            for line in stdout_data.lines() {
659                if !line.is_empty() {
660                    self.write(vec![line.to_string()], LoggerType::Log, true);
661                }
662            }
663        }
664
665        if !stderr_data.is_empty() {
666            for line in stderr_data.lines() {
667                if !line.is_empty() {
668                    self.write(vec![line.to_string()], LoggerType::Error, true);
669                }
670            }
671        }
672    }
673
674    pub fn end(&self) {
675        if !self.enabled {
676            return;
677        }
678
679        let logs_dir = "/mnt/logs";
680        if let Err(_) = create_dir_all(logs_dir) {
681            eprintln!("Failed to create logs directory");
682            return;
683        }
684
685        let logs_file_path = format!("{}/{}_logs.log", logs_dir, self.id);
686        let errors_file_path = format!("{}/{}_errors.log", logs_dir, self.id);
687
688        let mut logs_file = match OpenOptions::new()
689            .create(true)
690            .write(true)
691            .append(true)
692            .open(&logs_file_path)
693        {
694            Ok(f) => f,
695            Err(e) => {
696                eprintln!("Failed to open logs file: {}", e);
697                return;
698            }
699        };
700
701        let mut errors_file = match OpenOptions::new()
702            .create(true)
703            .write(true)
704            .append(true)
705            .open(&errors_file_path)
706        {
707            Ok(f) => f,
708            Err(e) => {
709                eprintln!("Failed to open errors file: {}", e);
710                return;
711            }
712        };
713
714        if let Ok(logs) = self.logs.lock() {
715            for log in logs.iter() {
716                let log_type = log.get("type").and_then(|v| v.as_str()).unwrap_or("");
717
718                if let Ok(log_str) = serde_json::to_string(log) {
719                    let file_to_write = if log_type == "error" {
720                        &mut errors_file
721                    } else {
722                        &mut logs_file
723                    };
724
725                    if let Err(e) = writeln!(file_to_write, "{}", log_str) {
726                        eprintln!("Failed to write log: {}", e);
727                    }
728                }
729            }
730        }
731
732        if let Err(e) = logs_file.flush() {
733            eprintln!("Failed to flush logs file: {}", e);
734        }
735
736        if let Err(e) = errors_file.flush() {
737            eprintln!("Failed to flush errors file: {}", e);
738        }
739    }
740}
741
742pub fn format_log_message(value: &dyn std::fmt::Debug) -> String {
743    format!("{:?}", value)
744}
745
746#[cfg(test)]
747mod tests {
748    use super::*;
749
750    #[test]
751    fn test_native_log_capture() {
752        let mut logger = Logger::new("enabled", Some("test".to_string())).unwrap();
753        logger.override_native_logs();
754        unsafe {
755            let msg = b"Native log\n";
756            libc::write(libc::STDOUT_FILENO, msg.as_ptr() as *const libc::c_void, msg.len());
757            let err = b"Native error\n";
758            libc::write(libc::STDERR_FILENO, err.as_ptr() as *const libc::c_void, err.len());
759        }
760        logger.revert_native_logs();
761
762        let logs = logger.logs.lock().unwrap();
763        let log_messages: Vec<String> = logs
764            .iter()
765            .map(|l| l.get("message").and_then(|v| v.as_str()).unwrap_or("").to_string())
766            .collect();
767
768        assert!(log_messages.iter().any(|m| m.contains("Native logs detected. Use context.log() or context.error() for better experience.")));
769        assert!(log_messages.iter().any(|m| m == "Native log"));
770        assert!(log_messages.iter().any(|m| m == "Native error"));
771    }
772
773    #[test]
774    fn test_native_info_logged_once() {
775        let mut logger = Logger::new("enabled", Some("test2".to_string())).unwrap();
776        logger.override_native_logs();
777        unsafe {
778            let msg1 = b"First native log\n";
779            libc::write(libc::STDOUT_FILENO, msg1.as_ptr() as *const libc::c_void, msg1.len());
780            let msg2 = b"Second native log\n";
781            libc::write(libc::STDOUT_FILENO, msg2.as_ptr() as *const libc::c_void, msg2.len());
782        }
783        logger.revert_native_logs();
784
785        let logs = logger.logs.lock().unwrap();
786        let info_count = logs
787            .iter()
788            .filter(|l| {
789                let msg = l.get("message").and_then(|v| v.as_str()).unwrap_or("");
790                msg.contains("Native logs detected")
791            })
792            .count();
793
794        assert_eq!(info_count, 1);
795    }
796
797    #[test]
798    fn test_disabled_logging_no_native_capture() {
799        let mut logger = Logger::new("disabled", Some("test3".to_string())).unwrap();
800        logger.override_native_logs();
801        println!("Should not be captured");
802        logger.revert_native_logs();
803
804        let logs = logger.logs.lock().unwrap();
805        assert!(logs.is_empty());
806    }
807}