import math
import tempfile
from rcf3 import Forest
def test_creating_forest_basic():
forest = Forest(
input_dim=2,
shingle_size=1,
num_trees=50,
capacity=256,
)
assert forest.num_trees() == 50
print("✓ test_creating_forest_basic passed")
def test_creating_forest_with_time_series():
forest = Forest(
input_dim=4,
shingle_size=8,
num_trees=100,
capacity=512,
time_decay=0.01,
internal_shingling=True,
)
assert forest.num_trees() == 100
print("✓ test_creating_forest_with_time_series passed")
def test_basic_operations():
forest = Forest(input_dim=2, capacity=256, num_trees=50)
point = [1.5, 2.3]
forest.update(point)
if forest.is_ready():
score = forest.score(point)
print(f"Anomaly score: {score}")
assert score >= 0.0
print(f"Entries seen: {forest.entries_seen()}")
print("✓ test_basic_operations passed")
def test_scoring_methods():
forest = Forest(input_dim=3, capacity=256, num_trees=50)
for _ in range(100):
forest.update([1.5, 2.3, -0.5])
point = [1.5, 2.3, -0.5]
score = forest.score(point)
assert score >= 0.0
print(f"RCF Score: {score}")
displacement = forest.displacement_score(point)
assert displacement >= 0.0
print(f"Displacement Score: {displacement}")
density = forest.density(point)
assert density >= 0.0
print(f"Density: {density}")
print("✓ test_scoring_methods passed")
def test_feature_attribution():
forest = Forest(input_dim=3, capacity=256, num_trees=50)
for _ in range(100):
forest.update([1.0, 2.0, 3.0])
point = [1.5, 2.3, 100.0]
attribution = forest.attribution(point)
for i, attr in enumerate(attribution):
print(f"Dimension {i}: below={attr['below']}, above={attr['above']}")
assert attr["below"] >= 0.0
assert attr["above"] >= 0.0
print("✓ test_feature_attribution passed")
def test_neighborhood_search():
forest = Forest(input_dim=2, capacity=256, num_trees=50)
data = [
[1.0, 2.0],
[1.1, 2.1],
[1.2, 2.2],
[1.3, 2.3],
[1.4, 2.4],
[5.0, 6.0],
[5.1, 6.1],
[5.2, 6.2],
]
for point in data:
forest.update(point)
query_point = [1.5, 2.3]
neighbors = forest.near_neighbors(query_point, top_k=3, percentile=50)
print(f"Found {len(neighbors)} neighbors:")
for neighbor in neighbors:
print(
f" Distance: {neighbor['distance']}, Score: {neighbor['score']}, Point: {neighbor['point']}"
)
print("✓ test_neighborhood_search passed")
def test_missing_value_imputation():
forest = Forest(input_dim=3, capacity=256, num_trees=50)
for i in range(100):
forest.update([1.0 + i * 0.01, 2.0, 3.0])
point = [1.5, float("nan"), 3.0]
missing = [1]
imputed = forest.impute(point, missing, centrality=1.0)
print(f"Imputed value at index 1: {imputed[1]}")
assert not math.isnan(imputed[1])
assert len(imputed) == 3
print("✓ test_missing_value_imputation passed")
def test_time_series_forecasting():
forest = Forest(
input_dim=4,
shingle_size=8,
internal_shingling=True,
)
stream = [
[1.0, 2.0, 3.0, 4.0],
[1.1, 2.1, 3.1, 4.1],
[1.2, 2.2, 3.2, 4.2],
[1.3, 2.3, 3.3, 4.3],
[1.4, 2.4, 3.4, 4.4],
[1.5, 2.5, 3.5, 4.5],
[1.6, 2.6, 3.6, 4.6],
[1.7, 2.7, 3.7, 4.7],
[1.8, 2.8, 3.8, 4.8],
[1.9, 2.9, 3.9, 4.9],
]
for point in stream:
forest.update(point)
if forest.is_ready():
predictions = forest.extrapolate(5)
print(f"Predictions length: {len(predictions)}")
assert len(predictions) == 20
print("✓ test_time_series_forecasting passed")
def test_serialization():
forest = Forest(input_dim=2, capacity=256, num_trees=50)
for _ in range(50):
forest.update([1.5, 2.3])
json_str = forest.to_json()
assert len(json_str) > 0
print(f"JSON length: {len(json_str)}")
loaded = Forest.from_json(json_str)
assert loaded.num_trees() == forest.num_trees()
print("✓ test_serialization passed")
def test_pickle_serialization():
import pickle
forest = Forest(input_dim=2, capacity=256, num_trees=50)
for _ in range(50):
forest.update([1.5, 2.3])
with tempfile.TemporaryDirectory() as tmp_dir:
pickle_path = f"{tmp_dir}/forest.pkl"
with open(pickle_path, "wb") as f:
pickle.dump(forest, f)
with open(pickle_path, "rb") as f:
loaded = pickle.load(f)
assert loaded.num_trees() == forest.num_trees()
print("✓ test_pickle_serialization passed")
def test_anomaly_detection_example():
forest = Forest(input_dim=3, capacity=256, num_trees=50)
for i in range(200):
val = i * 0.01
forest.update([1.0 + val, 2.0 + val, 3.0 + val])
data = [
[1.0, 2.0, 3.0],
[1.1, 2.1, 3.1],
[1.2, 2.2, 3.2],
[100.0, 200.0, 300.0], [1.3, 2.3, 3.3],
]
anomaly_count = 0
for point in data:
if forest.is_ready():
score = forest.score(point)
attribution = forest.attribution(point)
print(f"Point: {point}, Score: {score}")
if score > 0.1:
print(f"Anomaly detected: score={score}")
for i, attr in enumerate(attribution):
print(f" Dimension {i}: {attr['above']:.2f}")
anomaly_count += 1
forest.update(point)
print(f"Total anomalies detected: {anomaly_count}")
assert anomaly_count > 0
print("✓ test_anomaly_detection_example passed")
if __name__ == "__main__":
import sys
print("Running Python README examples tests...\n")
try:
test_creating_forest_basic()
test_creating_forest_with_time_series()
test_basic_operations()
test_scoring_methods()
test_feature_attribution()
test_neighborhood_search()
test_missing_value_imputation()
test_time_series_forecasting()
test_serialization()
test_pickle_serialization()
test_anomaly_detection_example()
print("\n✅ All tests passed!")
except AssertionError as e:
print(f"\n❌ Test failed: {e}")
sys.exit(1)
except Exception as e:
print(f"\n❌ Error: {e}")
sys.exit(1)