1use serde::{Deserialize, Serialize};
16use std::time::{Duration, Instant};
17use thiserror::Error;
18
19#[derive(Debug, Error)]
20pub enum ProfilingError {
21 #[error("Job execution failed: {0}")]
22 ExecutionFailed(String),
23 #[error("Insufficient samples: expected at least {expected}, got {actual}")]
24 InsufficientSamples { expected: u32, actual: u32 },
25 #[error("Invalid configuration: {0}")]
26 InvalidConfiguration(String),
27}
28
29#[derive(Debug, Clone)]
31pub struct ProfileConfig {
32 pub sample_size: u32,
34 pub warmup_runs: u32,
36 pub max_execution_time: Duration,
38}
39
40impl Default for ProfileConfig {
41 fn default() -> Self {
42 Self {
43 sample_size: 10,
44 warmup_runs: 2,
45 max_execution_time: Duration::from_secs(300),
46 }
47 }
48}
49
50#[derive(Debug, Clone)]
52pub struct ResourceMeasurement {
53 pub duration: Duration,
55 pub peak_memory_bytes: u64,
57 pub cpu_time_us: u64,
59}
60
61#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
63pub struct JobProfile {
64 pub avg_duration_ms: u64,
66 pub p95_duration_ms: u64,
68 pub p99_duration_ms: u64,
70 pub peak_memory_mb: u32,
72 pub stateful: bool,
74 pub persistent_connections: bool,
76 pub sample_size: u32,
78}
79
80#[derive(Debug, Clone, Serialize, Deserialize)]
85pub struct BlueprintProfiles {
86 pub blueprint_name: String,
88 pub profiled_at: String,
90 pub jobs: std::collections::HashMap<u32, JobProfile>,
92}
93
94impl BlueprintProfiles {
95 pub fn new(blueprint_name: impl Into<String>) -> Self {
97 Self {
98 blueprint_name: blueprint_name.into(),
99 profiled_at: chrono::Utc::now().to_rfc3339(),
100 jobs: std::collections::HashMap::new(),
101 }
102 }
103
104 pub fn add_job(&mut self, job_id: u32, profile: JobProfile) {
106 self.jobs.insert(job_id, profile);
107 }
108
109 pub fn save_to_file(&self, path: impl AsRef<std::path::Path>) -> Result<(), ProfilingError> {
113 let json = serde_json::to_string_pretty(self).map_err(|e| {
114 ProfilingError::InvalidConfiguration(format!("JSON serialization failed: {e}"))
115 })?;
116
117 std::fs::write(path.as_ref(), json).map_err(|e| {
118 ProfilingError::InvalidConfiguration(format!("Failed to write file: {e}"))
119 })?;
120
121 Ok(())
122 }
123
124 pub fn load_from_file(path: impl AsRef<std::path::Path>) -> Result<Self, ProfilingError> {
126 let content = std::fs::read_to_string(path.as_ref()).map_err(|e| {
127 ProfilingError::InvalidConfiguration(format!("Failed to read file: {e}"))
128 })?;
129
130 serde_json::from_str(&content).map_err(|e| {
131 ProfilingError::InvalidConfiguration(format!("JSON deserialization failed: {e}"))
132 })
133 }
134
135 pub fn to_compressed_bytes(&self) -> Result<Vec<u8>, ProfilingError> {
140 use flate2::write::GzEncoder;
141 use flate2::Compression;
142 use std::io::Write;
143
144 let json = serde_json::to_string(self).map_err(|e| {
146 ProfilingError::InvalidConfiguration(format!("JSON serialization failed: {e}"))
147 })?;
148
149 let mut encoder = GzEncoder::new(Vec::new(), Compression::best());
151 encoder.write_all(json.as_bytes()).map_err(|e| {
152 ProfilingError::InvalidConfiguration(format!("Compression failed: {e}"))
153 })?;
154
155 encoder.finish().map_err(|e| {
156 ProfilingError::InvalidConfiguration(format!("Compression finalization failed: {e}"))
157 })
158 }
159
160 pub fn from_compressed_bytes(compressed: &[u8]) -> Result<Self, ProfilingError> {
162 use flate2::read::GzDecoder;
163 use std::io::Read;
164
165 let mut decoder = GzDecoder::new(compressed);
167 let mut json = String::new();
168 decoder.read_to_string(&mut json).map_err(|e| {
169 ProfilingError::InvalidConfiguration(format!("Decompression failed: {e}"))
170 })?;
171
172 serde_json::from_str(&json).map_err(|e| {
174 ProfilingError::InvalidConfiguration(format!("JSON deserialization failed: {e}"))
175 })
176 }
177
178 pub fn to_base64_string(&self) -> Result<String, ProfilingError> {
183 use base64::Engine;
184 let compressed = self.to_compressed_bytes()?;
185 Ok(base64::engine::general_purpose::STANDARD.encode(&compressed))
186 }
187
188 pub fn from_base64_string(encoded: &str) -> Result<Self, ProfilingError> {
193 use base64::Engine;
194 let compressed = base64::engine::general_purpose::STANDARD
195 .decode(encoded)
196 .map_err(|e| {
197 ProfilingError::InvalidConfiguration(format!("Base64 decode failed: {e}"))
198 })?;
199 Self::from_compressed_bytes(&compressed)
200 }
201
202 pub fn to_description_field(&self) -> Result<String, ProfilingError> {
212 let encoded = self.to_base64_string()?;
213 Ok(format!("[PROFILING_DATA_V1]{encoded}"))
214 }
215
216 pub fn from_description_field(description: &str) -> Option<Result<Self, ProfilingError>> {
221 description
222 .strip_prefix("[PROFILING_DATA_V1]")
223 .map(Self::from_base64_string)
224 }
225}
226
227pub fn has_profiling_data(description: &str) -> bool {
231 description.starts_with("[PROFILING_DATA_V1]")
232}
233
234#[cfg(unix)]
236fn get_current_memory_bytes() -> u64 {
237 use std::mem::MaybeUninit;
238
239 unsafe {
240 let mut usage = MaybeUninit::<libc::rusage>::uninit();
241 let result = libc::getrusage(libc::RUSAGE_SELF, usage.as_mut_ptr());
242
243 if result == 0 {
244 let usage = usage.assume_init();
245
246 #[cfg(target_os = "macos")]
248 return usage.ru_maxrss as u64;
249
250 #[cfg(target_os = "linux")]
251 return (usage.ru_maxrss as u64) * 1024;
252 }
253 }
254
255 0
256}
257
258#[cfg(not(unix))]
259fn get_current_memory_bytes() -> u64 {
260 0
261}
262
263#[cfg(unix)]
265fn get_cpu_time_us() -> u64 {
266 use std::mem::MaybeUninit;
267
268 unsafe {
269 let mut usage = MaybeUninit::<libc::rusage>::uninit();
270 let result = libc::getrusage(libc::RUSAGE_SELF, usage.as_mut_ptr());
271
272 if result == 0 {
273 let usage = usage.assume_init();
274 let user_us =
275 (usage.ru_utime.tv_sec as u64) * 1_000_000 + (usage.ru_utime.tv_usec as u64);
276 let sys_us =
277 (usage.ru_stime.tv_sec as u64) * 1_000_000 + (usage.ru_stime.tv_usec as u64);
278 return user_us + sys_us;
279 }
280 }
281
282 0
283}
284
285#[cfg(not(unix))]
286fn get_cpu_time_us() -> u64 {
287 0
288}
289
290pub struct ProfileRunner;
292
293impl ProfileRunner {
294 pub async fn profile_job<F, Fut>(
303 job_fn: F,
304 config: ProfileConfig,
305 ) -> Result<JobProfile, ProfilingError>
306 where
307 F: Fn() -> Fut,
308 Fut: std::future::Future<Output = Result<(), Box<dyn std::error::Error + Send + Sync>>>,
309 {
310 if config.sample_size == 0 {
311 return Err(ProfilingError::InvalidConfiguration(
312 "sample_size must be greater than 0".to_string(),
313 ));
314 }
315
316 for _ in 0..config.warmup_runs {
318 let _ = tokio::time::timeout(config.max_execution_time, job_fn()).await;
319 }
320
321 let mut measurements = Vec::with_capacity(config.sample_size as usize);
323
324 for _ in 0..config.sample_size {
325 let mem_before = get_current_memory_bytes();
326 let cpu_before = get_cpu_time_us();
327 let start = Instant::now();
328
329 match tokio::time::timeout(config.max_execution_time, job_fn()).await {
331 Ok(Ok(())) => {
332 let duration = start.elapsed();
333 let mem_after = get_current_memory_bytes();
334 let cpu_after = get_cpu_time_us();
335
336 measurements.push(ResourceMeasurement {
337 duration,
338 peak_memory_bytes: mem_after.saturating_sub(mem_before),
339 cpu_time_us: cpu_after.saturating_sub(cpu_before),
340 });
341 }
342 Ok(Err(e)) => {
343 return Err(ProfilingError::ExecutionFailed(e.to_string()));
344 }
345 Err(_) => {
346 return Err(ProfilingError::ExecutionFailed(format!(
347 "Job execution exceeded maximum time of {}s",
348 config.max_execution_time.as_secs()
349 )));
350 }
351 }
352 }
353
354 if measurements.is_empty() {
355 return Err(ProfilingError::InsufficientSamples {
356 expected: config.sample_size,
357 actual: 0,
358 });
359 }
360
361 Ok(Self::compute_statistics(measurements, config.sample_size))
362 }
363
364 fn compute_statistics(measurements: Vec<ResourceMeasurement>, sample_size: u32) -> JobProfile {
366 let mut durations: Vec<u64> = measurements
367 .iter()
368 .map(|m| m.duration.as_millis() as u64)
369 .collect();
370 durations.sort_unstable();
371
372 let mut memories: Vec<u64> = measurements.iter().map(|m| m.peak_memory_bytes).collect();
373 memories.sort_unstable();
374
375 let avg_duration_ms = if !durations.is_empty() {
376 durations.iter().sum::<u64>() / durations.len() as u64
377 } else {
378 0
379 };
380
381 let p95_duration_ms = Self::percentile(&durations, 95);
382 let p99_duration_ms = Self::percentile(&durations, 99);
383 let peak_memory_mb = (memories.last().copied().unwrap_or(0) / (1024 * 1024)) as u32;
384
385 JobProfile {
386 avg_duration_ms,
387 p95_duration_ms,
388 p99_duration_ms,
389 peak_memory_mb,
390 stateful: false,
391 persistent_connections: false,
392 sample_size,
393 }
394 }
395
396 fn percentile(sorted_data: &[u64], percentile: u8) -> u64 {
398 if sorted_data.is_empty() {
399 return 0;
400 }
401
402 let index = ((sorted_data.len() as f64) * (f64::from(percentile) / 100.0)).ceil() as usize;
403 let index = index.saturating_sub(1).min(sorted_data.len() - 1);
404 sorted_data[index]
405 }
406}
407
408#[cfg(test)]
409mod tests {
410 use super::*;
411 use std::time::Duration;
412
413 #[tokio::test]
414 async fn test_profile_simple_job() {
415 let config = ProfileConfig {
416 sample_size: 5,
417 warmup_runs: 1,
418 max_execution_time: Duration::from_secs(10),
419 };
420
421 let result = ProfileRunner::profile_job(
422 || async {
423 tokio::time::sleep(Duration::from_millis(10)).await;
424 Ok(())
425 },
426 config,
427 )
428 .await;
429
430 assert!(result.is_ok());
431 let profile = result.unwrap();
432 assert_eq!(profile.sample_size, 5);
433 assert!(profile.avg_duration_ms >= 10);
434 }
435
436 #[tokio::test]
437 async fn test_profile_failing_job() {
438 let config = ProfileConfig {
439 sample_size: 3,
440 warmup_runs: 0,
441 max_execution_time: Duration::from_secs(10),
442 };
443
444 let result =
445 ProfileRunner::profile_job(|| async { Err::<(), _>("test error".into()) }, config)
446 .await;
447
448 assert!(result.is_err());
449 assert!(matches!(
450 result.unwrap_err(),
451 ProfilingError::ExecutionFailed(_)
452 ));
453 }
454
455 #[tokio::test]
456 async fn test_profile_timeout() {
457 let config = ProfileConfig {
458 sample_size: 2,
459 warmup_runs: 0,
460 max_execution_time: Duration::from_millis(50),
461 };
462
463 let result = ProfileRunner::profile_job(
464 || async {
465 tokio::time::sleep(Duration::from_secs(10)).await;
466 Ok(())
467 },
468 config,
469 )
470 .await;
471
472 assert!(result.is_err());
473 }
474
475 #[test]
476 fn test_percentile_calculation() {
477 let data = vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
478 assert_eq!(ProfileRunner::percentile(&data, 50), 5);
479 assert_eq!(ProfileRunner::percentile(&data, 95), 10);
480 assert_eq!(ProfileRunner::percentile(&data, 99), 10);
481 }
482
483 #[test]
484 fn test_memory_measurement() {
485 let mem = get_current_memory_bytes();
486 #[cfg(unix)]
488 assert!(mem > 0);
489 }
490
491 #[test]
492 fn test_compression_single_job() {
493 let mut profiles = BlueprintProfiles::new("test");
494 profiles.add_job(
495 0,
496 JobProfile {
497 avg_duration_ms: 100,
498 p95_duration_ms: 150,
499 p99_duration_ms: 200,
500 peak_memory_mb: 256,
501 stateful: false,
502 persistent_connections: false,
503 sample_size: 10,
504 },
505 );
506
507 let compressed = profiles.to_compressed_bytes().unwrap();
509 println!("Compressed size (1 job): {} bytes", compressed.len());
510
511 assert!(
513 compressed.len() < 250,
514 "Compression too large: {} bytes",
515 compressed.len()
516 );
517
518 let decompressed = BlueprintProfiles::from_compressed_bytes(&compressed).unwrap();
520 assert_eq!(decompressed.blueprint_name, profiles.blueprint_name);
521 assert_eq!(decompressed.jobs.len(), 1);
522 assert_eq!(decompressed.jobs.get(&0).unwrap().avg_duration_ms, 100);
523 }
524
525 #[test]
526 fn test_compression_multiple_jobs() {
527 let mut profiles = BlueprintProfiles::new("complex-blueprint");
528
529 for i in 0..10 {
531 profiles.add_job(
532 i,
533 JobProfile {
534 avg_duration_ms: 100 + i as u64 * 50,
535 p95_duration_ms: 150 + i as u64 * 60,
536 p99_duration_ms: 200 + i as u64 * 70,
537 peak_memory_mb: 256 + i * 64,
538 stateful: i % 5 == 0,
539 persistent_connections: i % 7 == 0,
540 sample_size: 10,
541 },
542 );
543 }
544
545 let compressed = profiles.to_compressed_bytes().unwrap();
547 println!("Compressed size (10 jobs): {} bytes", compressed.len());
548
549 assert!(
551 compressed.len() < 700,
552 "Compression too large: {} bytes",
553 compressed.len()
554 );
555
556 let decompressed = BlueprintProfiles::from_compressed_bytes(&compressed).unwrap();
558 assert_eq!(decompressed.jobs.len(), 10);
559
560 assert_eq!(decompressed.jobs.get(&0).unwrap().peak_memory_mb, 256);
562 assert!(decompressed.jobs.get(&5).unwrap().stateful);
563 assert!(decompressed.jobs.get(&7).unwrap().persistent_connections);
564 }
565
566 #[test]
567 fn test_compression_large_blueprint() {
568 let mut profiles = BlueprintProfiles::new("massive-blueprint");
569
570 for i in 0..50 {
572 profiles.add_job(
573 i,
574 JobProfile {
575 avg_duration_ms: 100 + i as u64 * 20,
576 p95_duration_ms: 150 + i as u64 * 25,
577 p99_duration_ms: 200 + i as u64 * 30,
578 peak_memory_mb: 256 + i * 32,
579 stateful: i % 5 == 0,
580 persistent_connections: i % 7 == 0,
581 sample_size: 10,
582 },
583 );
584 }
585
586 let compressed = profiles.to_compressed_bytes().unwrap();
588 println!("Compressed size (50 jobs): {} bytes", compressed.len());
589
590 assert!(
592 compressed.len() < 3000,
593 "Compression too large: {} bytes",
594 compressed.len()
595 );
596
597 let decompressed = BlueprintProfiles::from_compressed_bytes(&compressed).unwrap();
599 assert_eq!(decompressed.jobs.len(), 50);
600 assert_eq!(decompressed.blueprint_name, "massive-blueprint");
601 }
602
603 #[test]
604 fn test_compression_roundtrip_preserves_data() {
605 let mut profiles = BlueprintProfiles::new("test");
606 profiles.add_job(
607 42,
608 JobProfile {
609 avg_duration_ms: 12345,
610 p95_duration_ms: 23456,
611 p99_duration_ms: 34567,
612 peak_memory_mb: 4096,
613 stateful: true,
614 persistent_connections: true,
615 sample_size: 100,
616 },
617 );
618
619 let compressed = profiles.to_compressed_bytes().unwrap();
620 let decompressed = BlueprintProfiles::from_compressed_bytes(&compressed).unwrap();
621
622 let original_job = profiles.jobs.get(&42).unwrap();
623 let decompressed_job = decompressed.jobs.get(&42).unwrap();
624
625 assert_eq!(
626 original_job.avg_duration_ms,
627 decompressed_job.avg_duration_ms
628 );
629 assert_eq!(
630 original_job.p95_duration_ms,
631 decompressed_job.p95_duration_ms
632 );
633 assert_eq!(
634 original_job.p99_duration_ms,
635 decompressed_job.p99_duration_ms
636 );
637 assert_eq!(original_job.peak_memory_mb, decompressed_job.peak_memory_mb);
638 assert_eq!(original_job.stateful, decompressed_job.stateful);
639 assert_eq!(
640 original_job.persistent_connections,
641 decompressed_job.persistent_connections
642 );
643 assert_eq!(original_job.sample_size, decompressed_job.sample_size);
644 }
645
646 #[test]
647 fn test_base64_encoding_for_chain_storage() {
648 let mut profiles = BlueprintProfiles::new("incredible-squaring");
649 profiles.add_job(
650 0,
651 JobProfile {
652 avg_duration_ms: 5,
653 p95_duration_ms: 8,
654 p99_duration_ms: 10,
655 peak_memory_mb: 256,
656 stateful: false,
657 persistent_connections: false,
658 sample_size: 10,
659 },
660 );
661
662 let encoded = profiles.to_base64_string().unwrap();
664 println!("Base64 encoded size: {} bytes", encoded.len());
665
666 assert!(
668 encoded.len() < 400,
669 "Base64 size should be < 400 bytes for 1 job"
670 );
671
672 let decoded = BlueprintProfiles::from_base64_string(&encoded).unwrap();
674 assert_eq!(decoded.blueprint_name, "incredible-squaring");
675 assert_eq!(decoded.jobs.len(), 1);
676
677 let job = decoded.jobs.get(&0).unwrap();
678 assert_eq!(job.avg_duration_ms, 5);
679 assert_eq!(job.peak_memory_mb, 256);
680 }
681
682 #[test]
683 fn test_base64_encoding_multiple_jobs() {
684 let mut profiles = BlueprintProfiles::new("complex-blueprint");
685
686 for i in 0..10 {
688 profiles.add_job(
689 i,
690 JobProfile {
691 avg_duration_ms: 100 + i as u64 * 50,
692 p95_duration_ms: 150 + i as u64 * 60,
693 p99_duration_ms: 200 + i as u64 * 70,
694 peak_memory_mb: 256 + i * 64,
695 stateful: i % 5 == 0,
696 persistent_connections: i % 7 == 0,
697 sample_size: 10,
698 },
699 );
700 }
701
702 let encoded = profiles.to_base64_string().unwrap();
704 println!("Base64 encoded size (10 jobs): {} bytes", encoded.len());
705
706 assert!(
708 encoded.len() < 1000,
709 "Base64 size should be < 1KB for 10 jobs"
710 );
711
712 let decoded = BlueprintProfiles::from_base64_string(&encoded).unwrap();
714 assert_eq!(decoded.jobs.len(), 10);
715 assert_eq!(decoded.jobs.get(&0).unwrap().peak_memory_mb, 256);
716 assert!(decoded.jobs.get(&5).unwrap().stateful);
717 }
718}