from pathlib import Path
import shutil
import time
import matplotlib.pyplot as plt
import torch
from torch.utils.data import DataLoader
import unittest
try:
import psutil
except ImportError: psutil = None
from bevy_zeroverse_dataloader import BevyZeroverseDataset, \
ChunkedIteratorDataset, FolderDataset, MP4Dataset, \
chunk_and_save, load_chunk, save_to_folders, save_to_mp4, write_sample
import matplotlib.pyplot as plt
import torch
from torch.utils.data import DataLoader
import unittest
from bevy_zeroverse_dataloader import BevyZeroverseDataset, \
ChunkedIteratorDataset, FolderDataset, MP4Dataset, \
chunk_and_save, load_chunk, save_to_folders, save_to_mp4, write_sample
def visualize(batch):
print(batch['color'].shape)
is_chunked = len(batch['color'].shape) == 6
color_images = batch['color'].numpy()
if is_chunked:
color_images = color_images.squeeze(0)
batch_size = color_images.shape[0]
num_cameras = color_images.shape[1]
num_image_types = 2 total_images = batch_size * num_cameras * num_image_types
cols = 9
rows = (total_images + cols - 1) // cols
fig, axes = plt.subplots(rows, cols, figsize=(20, 20))
axes = axes.flatten()
for batch_idx in range(batch_size):
for cam_idx in range(num_cameras):
index = (batch_idx * num_cameras + cam_idx) * num_image_types
color_image = color_images[batch_idx, cam_idx]
axes[index].imshow(color_image)
axes[index].axis('off')
if batch_size <= 2:
axes[index].set_title(f'({batch_idx + 1}, {cam_idx + 1})_color')
for ax in axes[total_images:]:
ax.axis('off')
plt.tight_layout()
plt.subplots_adjust(wspace=0, hspace=0)
def on_key(event):
plt.close(fig)
if event.key == 'escape':
exit(0)
fig.canvas.mpl_connect('key_press_event', on_key)
plt.show()
def benchmark(dataloader):
import time
start = time.time()
count = 0
for batch in dataloader:
print('batch shape:', batch['color'].shape)
count += 1
if count == 100:
break
end = time.time()
print('seconds per batch:', (end - start) / count)
def test():
dataset = BevyZeroverseDataset(
editor=False, headless=True, num_cameras=6,
width=640, height=480, num_samples=1e6,
scene_type='room',
)
dataloader = DataLoader(dataset, batch_size=5, shuffle=True, num_workers=2)
for batch in dataloader:
visualize(batch)
generated = False
class TestChunkedDataset(unittest.TestCase):
@classmethod
def setUpClass(cls):
cls.editor = True
cls.headless = True
cls.num_cameras = 4
cls.width = 640
cls.height = 480
cls.num_samples = 10
cls.bytes_per_chunk = int(256 * 1024 * 1024)
cls.samples_per_chunk = 3
cls.stage = "test"
cls.output_dir = Path("./data/zeroverse") / cls.stage
if cls.output_dir.exists():
shutil.rmtree(cls.output_dir)
cls.dataset = BevyZeroverseDataset(
cls.editor,
cls.headless,
cls.num_cameras,
cls.width,
cls.height,
cls.num_samples,
scene_type='room',
)
cls.chunk_paths = chunk_and_save(
cls.dataset,
cls.output_dir / 'chunk',
bytes_per_chunk=cls.bytes_per_chunk,
samples_per_chunk=cls.samples_per_chunk,
n_workers=0,
)
save_to_folders(
cls.dataset,
cls.output_dir / 'fs',
n_workers=0,
)
save_to_mp4(
cls.dataset,
cls.output_dir / 'mp4',
n_workers=0,
)
@classmethod
def tearDownClass(cls):
if cls.output_dir.exists():
shutil.rmtree(cls.output_dir)
def setUp(self):
self.output_dir = type(self).output_dir
self.chunk_paths = type(self).chunk_paths
def test_benchmark_chunked_dataloader(self):
chunked_dataset = ChunkedIteratorDataset(self.output_dir / 'chunk')
dataloader = DataLoader(chunked_dataset, batch_size=1, shuffle=False)
print("\nBenchmarking chunks:")
benchmark(dataloader)
def test_benchmark_folder_dataloader(self):
chunked_dataset = FolderDataset(self.output_dir / 'fs')
dataloader = DataLoader(chunked_dataset, batch_size=1, shuffle=False)
print("\nBenchmarking folder:")
benchmark(dataloader)
def test_benchmark_mp4_dataloader(self):
chunked_dataset = MP4Dataset(self.output_dir / 'mp4')
dataloader = DataLoader(chunked_dataset, batch_size=1, shuffle=False)
print("\nBenchmarking mp4:")
benchmark(dataloader)
def test_update_sample_in_chunk(self):
chunked_ds = ChunkedIteratorDataset(self.output_dir / 'chunk', shuffle=False)
sample = next(iter(chunked_ds))
chunk_path = Path(sample["_chunk_path"])
sample_idx = sample["_sample_idx"]
chunk_before = load_chunk(chunk_path)
B = chunk_before["color"].shape[0]
self.assertGreaterEqual(B, 1, "chunk unexpectedly empty")
other_idx = (sample_idx + 1) % B if B > 1 else None
if other_idx is not None:
other_color_before = chunk_before["color"][other_idx].clone()
new_color = torch.zeros_like(sample["color"])
mod_sample = {
**{k: v for k, v in sample.items() if k[0] != "_"},
"color": new_color,
"_chunk_path": str(chunk_path),
"_sample_idx": sample_idx,
}
write_sample(mod_sample)
chunk_after = load_chunk(chunk_path)
updated_color = chunk_after["color"][sample_idx]
self.assertLess(
updated_color.mean().item(),
0.05,
"updated sample did not change as expected",
)
if other_idx is not None:
other_color_after = chunk_after["color"][other_idx]
self.assertTrue(
torch.equal(other_color_before, other_color_after),
"neighbouring sample was unintentionally modified",
)
class TestMemoryUsage(unittest.TestCase):
def test_chunk_generation_memory_stability(self):
if psutil is None:
self.skipTest("psutil is required for memory checks")
proc = psutil.Process()
baseline = proc.memory_info().rss
root = Path("./data/zeroverse/memory_test")
if root.exists():
shutil.rmtree(root)
root.mkdir(parents=True, exist_ok=True)
runs = 3
for idx in range(runs):
run_dir = root / f"run_{idx}"
dataset = BevyZeroverseDataset(
editor=False,
headless=True,
num_cameras=2,
width=640,
height=480,
num_samples=6,
scene_type='room',
)
chunk_and_save(
dataset,
run_dir,
bytes_per_chunk=64 * 1024 * 1024,
samples_per_chunk=3,
n_workers=0,
full_size_only=False,
memory_cleanup=True,
)
time.sleep(0.1)
final = proc.memory_info().rss
max_allowed = 512 * 1024 * 1024
self.assertLess(
final - baseline,
max_allowed,
f"RSS increased by {(final - baseline) / 1e6:.2f} MB which exceeds the allowed threshold",
)
def main():
unittest.main()
if __name__ == "__main__":
main()